我正在尝试与一个只能接受单个TCP连接(内存限制)的设备接口,因此不能像在数据库连接等正常客户端-服务器情况下那样为每个工作线程启动连接。
我尝试过使用多处理器管理器dict,它可以在线程之间全局访问,格式为:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
像这样的任务:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
然而,该任务将失败,并出现序列化错误,例如:
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
即使我没有用不可序列化的值调用任务,并且该任务也没有尝试返回不可序列化值,Celery似乎痴迷于尝试序列化这个全局对象?
我错过了什么?您将如何使Celery任务中使用的客户端设备连接线程安全并可在线程之间访问?示例代码?
使用Redis实现分布式锁管理器怎么样?Redis python客户端内置了锁定功能。另外,请参阅redi.io上的文档。即使您使用的是RabbitMQ或其他代理,redis也是非常轻量级的。
例如,作为装饰师:
from functools import wraps
def device_lock(block=True):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return_value = None
have_lock = False
lock = redisconn.lock('locks.device', timeout=2, sleep=0.01)
try:
have_lock = lock.acquire(blocking=block)
if have_lock:
return_value = func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return return_value
return wrapper
return decorator
@shared_task
@device_lock
def send_command(controller, commandname, args):
"""Send a command to the controller."""
...
你也可以使用Celery任务食谱中的这种方法:
from celery import task
from celery.utils.log import get_task_logger
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@task(bind=True)
def import_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
# cache.add fails if the key already exists
acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
logger.debug('Importing feed: %s', feed_url)
if acquire_lock():
try:
feed = Feed.objects.import_feed(feed_url)
finally:
release_lock()
return feed.url
logger.debug(
'Feed %s is already being imported by another worker', feed_url)
...
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
在浏览了一下互联网后,我意识到我可能错过了回溯中的一些重要内容。在仔细查看了回溯后,我意识到不是Celery试图pickle连接对象,而是Multiprocessing.reduction。reduction用于在一侧串行化,在另一侧重建。
我有几种替代方法来解决这个问题,但它们都没有真正达到我最初想要的目的,即借用客户端库连接对象并使用它,这在多处理和预处理中是不可能的。
您是否尝试过使用gevent或eventlet芹菜工作程序来代替进程和线程?在这种情况下,您将能够使用全局var或threading.local()来共享连接对象。