在扭曲的共享对象中分叉多个进程



我想用twisted派生多个进程。从这里的讨论中我知道Twisted和多处理是不兼容的。的确,我可以从不同的终端启动不同的流程来实现相同的效果,但我不能这样做

有一个大对象(大小以GB为单位),我想在不同的python进程之间共享(由于我计算机上的RAM限制,无法在RAM上多次加载同一对象)

我想做的是

  1. 在一个进程中异步启动多个rabbit mq使用者
  2. 用一个公共共享对象分叉多个这样的进程,以利用我系统中的所有CPU

我能够实现第一步。以下是我的想法-

import pika
class PikaFactory(protocol.ReconnectingClientFactory):
def __init__(self, parameters, task_number=0, shared_obj=None):
self.parameters = parameters
self.task_count = total_tasks
self.task_number = task_number
self.shared_obj= shared_obj
def buildProtocol(self, addr):
self.resetDelay()
logger.info('Task: %s, Connected' % self.task_number)
proto = twisted_connection.TwistedProtocolConnection(self.parameters)
# run is a async function that consumes the rabbit-queue
proto.ready.addCallback(run, self.task_number, self.shared_obj)
return proto

# Rest of the implementation ...........
def run_tasks(shared_obj):
from twisted.internet import reactor
try:
parameters = pika.ConnectionParameters(**CONNECTION_PARAMETERS)
factory = PikaFactory(parameters, 0, shared_obj)
for i in range(total_tasks):
# Launch multiple async-tasks in the same process
reactor.connectTCP(parameters.host, parameters.port, factory)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
reactor.run()
except:
logger.exception("Error")
reactor.stop()
if __name__ == '__main__':
obj = Fibonacci()
run_tasks(obj)

现在,为了分叉多个进程,我已经编写了这段代码。

from multiprocessing.managers import BaseManager
class MyManager(BaseManager):
"""
This Manager is responsible for coordinating shared
information state between all processes
"""
pass
# Register your custom "Fibonacci" class with the manager
# This is the class I want to share among multiple processes
MyManager.register('Fibonacci', Fibonacci)
def Manager():
m = MyManager()
m.start()
return m
def run_multiple_processes():
manager = Manager()
# object I want to share among multiple processes 
fibonacci = manager.Fibonacci()
pool = multiprocessing.Pool(processes=workers)
for i in range(0, workers):
pool.apply_async(run_tasks, (fibonacci, ))
# Stay alive
try:
while True:
continue
except KeyboardInterrupt:
logger.error(' [*] Exiting...')
pool.terminate()
pool.join()

我在运行上面的代码时得到了一些随机错误,比如-

builtins.AttributeError: '_SIGCHLDWaker' object has no attribute 'doWrite'

启动多个进程并在它们之间共享自定义对象的扭曲方式是什么。不会对对象执行任何写入操作,该对象仅用于读取其属性。

提前谢谢。

在搜索时,我遇到了这个与python多处理不兼容的后扭曲。答案存在于这篇文章中,只引用了他的话——

"在创建子进程之前,不加载任何Twisted。这意味着在创建子过程之前,甚至不导入Twisted">

感谢@Jean-Paul Calderone的有益评论。

最新更新