在Celery任务中使用多处理并发机制



我正在尝试与一个只能接受单个TCP连接(内存限制)的设备接口,因此不能像在数据库连接等正常客户端-服务器情况下那样为每个工作线程启动连接。

我尝试过使用多处理器管理器dict,它可以在线程之间全局访问,格式为:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

像这样的任务:

from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    # Create client connection if one does not exist.
    conn = None
    addr, port = controller
    if controller not in clients:
        conn = Client(addr, port)
        conn.connect()
        lock = manager.RLock()
        clients[controller] = (conn, lock,)
        print("New controller connection to %s:%s" % (addr, port,))
    else:
        conn, lock = clients[controller]
    try:
        f = getattr(conn, commandname) # See if connection.commandname() exists.
    except Exception:
        raise Exception("command: %s not known." % (commandname))
    with lock:
        res = f(*args)
        return res

然而,该任务将失败,并出现序列化错误,例如:

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

即使我没有用不可序列化的值调用任务,并且该任务也没有尝试返回不可序列化值,Celery似乎痴迷于尝试序列化这个全局对象?

我错过了什么?您将如何使Celery任务中使用的客户端设备连接线程安全并可在线程之间访问?示例代码?

使用Redis实现分布式锁管理器怎么样?Redis python客户端内置了锁定功能。另外,请参阅redi.io上的文档。即使您使用的是RabbitMQ或其他代理,redis也是非常轻量级的。

例如,作为装饰师:

from functools import wraps
def device_lock(block=True):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return_value = None
            have_lock = False
            lock = redisconn.lock('locks.device', timeout=2, sleep=0.01)
            try:
                have_lock = lock.acquire(blocking=block)
                if have_lock:
                    return_value = func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()
            return return_value
        return wrapper
    return decorator
@shared_task
@device_lock
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    ...

你也可以使用Celery任务食谱中的这种方法:

from celery import task
from celery.utils.log import get_task_logger
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    # cache.add fails if the key already exists
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)
    logger.debug('Importing feed: %s', feed_url)
    if acquire_lock():
        try:
            feed = Feed.objects.import_feed(feed_url)
        finally:
            release_lock()
        return feed.url
    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)
 ...
self._send_bytes(ForkingPickler.dumps(obj))
 File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

在浏览了一下互联网后,我意识到我可能错过了回溯中的一些重要内容。在仔细查看了回溯后,我意识到不是Celery试图pickle连接对象,而是Multiprocessing.reduction。reduction用于在一侧串行化,在另一侧重建。

我有几种替代方法来解决这个问题,但它们都没有真正达到我最初想要的目的,即借用客户端库连接对象并使用它,这在多处理和预处理中是不可能的。

您是否尝试过使用gevent或eventlet芹菜工作程序来代替进程和线程?在这种情况下,您将能够使用全局var或threading.local()来共享连接对象。

相关内容

  • 没有找到相关文章

最新更新