属性错误:无法获取<模块__mp_main__上的属性'journalerReader'



我尝试在python中实现Lmax。我尝试在4个进程中处理数据

import disruptor  
import multiprocessing
import random
if __name__ == '__main__':
cb = disruptor.CircularBuffer(5)
def receiveWriter():
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader():
while(True):
cb.replicator()
def journalerReader():
while(True):
cb.journaler()
def unmarshallerReader():
while(True):
cb.unmarshaller()
def consumeReader():
while(True):
print(cb.consume())



p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter)
p0.start()

p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader)
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader)
p4.start()

但是我在我的代码中得到这个错误:

Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main
File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
exitcode = _main(fd, parent_sentinel)
File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main
File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>

您的第一个问题是Process调用的目标不能在if __name__ == '__main__':块内。但是:

正如我在你之前的一篇文章中提到的,我认为你可以跨多个进程共享CircularBuffer实例的唯一方法是实现一个管理的类,令人惊讶的是,这并不是那么困难。但是,当您创建一个托管类并创建该类的实例时,您所拥有的实际上是对该对象的代理引用。这有两个含义:
  1. 每个方法调用更像是对您将启动的管理器创建的特殊服务器进程的远程过程调用,因此比本地方法调用有更多的开销。
  2. 如果你打印引用,类的__str__方法将不会被调用;您将打印代理指针的表示形式。你可能应该将__str__方法重命名为dump之类的东西,并在你想要实例的表示时显式调用它。

您还应该显式地等待正在创建的进程完成,以便管理器服务不会过早关闭,这意味着应该为每个进程分配一个唯一的变量并具有唯一的名称。

import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random
class CircularBufferManager(BaseManager):
pass
def receiveWriter(cb):
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader(cb):
while(True):
cb.replicator()
def journalerReader(cb):
while(True):
cb.journaler()
def unmarshallerReader(cb):
while(True):
cb.unmarshaller()
def consumeReader(cb):
while(True):
print(cb.consume())
if __name__ == '__main__':
# Create managed class
CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
# create and start manager:
with CircularBufferManager() as manager:
cb = manager.CircularBuffer(5)
p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
p0.start()
p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
p1a.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
p4.start()
p1.join()
p0.join()
p1a.join()
p2.join()
p3.join()
p4.join()

最新更新