我已经在相关Python 3.9.2文档上对以下测试程序进行了建模,该文档涉及远程进程之间的数据同步。不过,据我所知,它实际上并不起作用,所以我想有一些我不知道的事情。文档中没有明确说明SyncManager对象在远程进程中的部署,但它们毕竟是BaseManager子类的实例,因此必须假设同样的技术也应该有效。
在下面的代码之后,shell输出显示了三个并发调用,这些调用大概说明了我遇到的问题。尽管正在与服务器建立连接,但dict并没有同步。问题是:为什么?
#!/usr/bin/env python3
# <zteeq.py>
import multiprocessing as mp
import multiprocessing.shared_memory as sm
import multiprocessing.managers as mgrs
import os, sys
#######################################################
class CatalogManager( mgrs.SyncManager): pass
CatalogManager.register( 'get_catalog', dict, mgrs.DictProxy)
#######################################################
class ShareCatalog():
#######################################################
def __init__( self,
catalogManagerAddress,
catalogManagerAuthkey,
**kwargs
):
self.catalogManagerAddress = catalogManagerAddress
self.catalogManagerAuthkey = catalogManagerAuthkey
#######################################################
def start( self):
self.catalogManager = CatalogManager(
self.catalogManagerAddress,
self.catalogManagerAuthkey,
)
try:
self.catalogManager.connect()
print( 'connected self.catalogManager')
except ConnectionRefusedError:
catalogManagerServer = self.catalogManager.get_server()
print( 'starting self.catalogManager')
catalogManagerServer.serve_forever()
self.catalog = self.catalogManager.get_catalog()
###
print( 'pid %d: first stop: %r' % ( os.getpid(), str( self.catalog)))
input()
###
if 'streams' not in self.catalog:
print( 'adding streams')
self.catalog[ 'streams'] = {}
###
print( 'pid %d: second stop: %r' % ( os.getpid(), str( self.catalog)))
input()
###
#######################################################
if __name__ == '__main__':
mp.set_start_method( 'spawn')
shareCatalog = ShareCatalog(
( '127.0.1.1', 43210),
b'abc',
)
shareCatalog.start()
#</zteeq.py>
在第一个shell中,SyncManager服务器启动:
# ./zteeq.py
starting self.catalogManager
离开运行,我在第二个shell中再次启动程序:
# ./zteeq.py
connected self.catalogManager
pid 2486196: first stop: '{}'
adding streams
pid 2486196: second stop: "{'streams': {}}"
到目前为止,一切都很好。我让它继续运行,并调用第三次。但第三次调用对第二次调用的作用一无所知;不存在";流";共享字典中的密钥:
# ./zteeq.py
connected self.catalogManager
pid 2492338: first stop: '{}'
我错过了什么?
(Python 3.9.2((Linux 5.10.0-4-amd64#1 SMP Debian 5.10.19-1(2021-03-02(x86_64 GNU/Linux(
备注:一般来说,文档似乎假设所有SyncManager对象都将通过名为"的快捷方式创建;多处理。经理(("其不提供远程套接字通信的规范。我认为这样的对象是由所有将使用它的进程分叉继承的,正如我迄今为止发现的所有示例所示。但这不是我想做的。
我所做的一些推论被证明是不正确的。我希望下面的解决方案看起来不那么笨重和多余,但我认为最好以这种方式发布,因为冗余本身就是信息。(冗余的必要性让我感到惊讶,我仍在考虑。(文档中建议将包含的对象保留为非托管对象,然后简单地调整托管容器,以便告诉管理器更新客户端,但这一建议并不奏效。我不知道为什么;可能是另一个错误的推论或误解。不管怎样,以下内容确实有效。
#!/usr/bin/env python3
import multiprocessing as mp
import multiprocessing.shared_memory as sm
import multiprocessing.managers as mgrs
import os, sys
#######################################################
#######################################################
class ShareCatalog():
#######################################################
def __init__( self,
catalogManagerAddress,
catalogManagerAuthkey,
**kwargs
):
self.catalogManagerAddress = catalogManagerAddress
self.catalogManagerAuthkey = catalogManagerAuthkey
#######################################################
def start( self, server):
class CatalogManager( mgrs.SyncManager): pass
if server:
catalogDict = {
'streams': {},
}
CatalogManager.register( 'get_catalog', lambda:catalogDict, mgrs.DictProxy)
CatalogManager.register( 'get_streams', lambda:catalogDict[ 'streams'], mgrs.DictProxy)
self.catalogManager = CatalogManager(
self.catalogManagerAddress,
self.catalogManagerAuthkey,
)
catalogManagerServer = self.catalogManager.get_server()
print( 'starting self.catalogManager')
catalogManagerServer.serve_forever()
else: ## not server
CatalogManager.register( 'get_catalog')
CatalogManager.register( 'get_streams')
self.catalogManager = CatalogManager(
self.catalogManagerAddress,
self.catalogManagerAuthkey,
)
self.catalogManager.connect()
print( 'connected self.catalogManager')
self.catalog = self.catalogManager.get_catalog()
self.streams = self.catalogManager.get_streams()
###
ctr = -1
while True:
print( 'pid %d: first stop: %r' % ( os.getpid(), str( self.catalog)))
input()
ctr += 1
self.catalog.setdefault(
( os.getpid(), ctr,),
None,
)
self.streams[ ctr] = None
print( 'pid %d: second stop: %r' % ( os.getpid(), str( self.catalog)))
input()
###
#######################################################
if __name__ == '__main__':
mp.set_start_method( 'spawn')
shareCatalog = ShareCatalog(
( '127.0.1.1', 43210),
b'abc',
)
shareCatalog.start( eval( sys.argv[ 1]))
和以前一样,第一次调用启动服务器:
# ./zteeq.py True ## True means "be the server"
starting self.catalogManager
第二次调用:
# ./zteeq.py False
connected self.catalogManager
pid 2767634: first stop: "{'streams': {}}"
pid 2767634: second stop: "{'streams': {0: None}, (2767634, 0): None}"
pid 2767634: first stop: "{'streams': {0: None}, (2767634, 0): None}"
pid 2767634: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"
让第二个调用继续运行,下面是第三个调用:
# ./zteeq.py False
connected self.catalogManager
pid 2767704: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"
pid 2767704: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"
pid 2767704: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"
回到第二次调用,我们看到第三次调用在按下Enter键后的变化:
# ./zteeq.py False
connected self.catalogManager
pid 2767634: first stop: "{'streams': {}}"
pid 2767634: second stop: "{'streams': {0: None}, (2767634, 0): None}"
pid 2767634: first stop: "{'streams': {0: None}, (2767634, 0): None}"
pid 2767634: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"
pid 2767634: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"
好到可以使用,无论如何!