在多个独立进程中使用 Python RLocks



我正在做一个Django项目,该项目使用Celery来安排一些长期任务。Django 和 Celery 都在完全独立的进程中运行,并且需要一种方法来协调对数据库的访问。我想使用 Python 的 multiprocessing.RLock 类(或等效类),因为我需要锁是可重入的。

我的问题是,如何为单独的进程提供对 RLock 的访问?

我发现的两个最好的解决方案(posix_ipc模块和fcntl)仅限于基于Unix的系统,我们希望避免将自己限制在这一点上。

是否有一种跨平台的方法可以在进程之间共享锁,而无需具有共同的祖先进程?

我最终使用RabbitMQ作为创建分布式锁的一种方式。有关如何执行此操作的详细信息可以在RabbitMQ的博客上找到:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/。

简而言之,您为锁创建一个 RabbitMQ 队列,并向其发送一条消息。要获取锁,请在队列上运行basic_get(非阻塞)或basic_consume(阻塞)。这会从队列中删除消息,从而防止其他线程获取锁。工作完成后,发送否定确认将导致 RabbitMQ 对消息重新排队,从而允许下一个线程继续。

不幸的是,这不允许重入锁。

上面引用的链接提供了有关如何执行此操作的 Java 代码。弄清楚如何将其转换为Python/Pika已经足够烦人了,以至于我认为我应该在这里发布一些示例代码。

要生成锁:

import pika
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()
    channel.queue_declare(queue="LockQueue")
    channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock')
    channel.close()

获取锁:

import pika
import time
def callback(ch, method, properties, body):
    print("Got lock")
    for i in range(5, 0, -1):
        print("Tick {}".format(i))
        time.sleep(1)
    print("Releasing lock")
    ch.basic_nack(delivery_tag=method.delivery_tag)
    ch.close()  # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock.
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()
    channel.queue_declare(queue='LockQueue')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='LockQueue')
    print("Waiting for lock")
    channel.start_consuming()
    print("Task completed")

相关内容

  • 没有找到相关文章

最新更新