Python扭曲:如何使反应器轮询一个管道和互联网套接字



我想使用Twisted作为客户端/服务器管理器,它是常规Python对象的一部分。我试图实现的解决方案是使用multiprocessing.Process在Twisted自己的进程中隔离Twisted,并通过multiprocessing.Pipe与该进程通信。

我已经用Twisted对客户端/服务器逻辑进行了编码,但现在我只能将multiprocessing.Pipe通信与reactor连接起来。

我是Twisted的初学者,所以我可能错过了一些显而易见的东西,但根据我对reactor如何工作的了解,我想reactor应该以某种方式从我的multiprocessing.Pipe以及它似乎已经很好地处理的套接字中进行轮询。所以我的问题是,我如何让反应堆在它已经在做的事情的基础上听我的multiprocessing.Pipe

到目前为止,我的代码看起来像这样:

class ServerProtocol(Protocol):
def __init__(self, server):
self._server = server
def connectionMade(self):
pass
def connectionLost(self, reason):
pass
def dataReceived(self, data):
pass

class ServerProtocolFactory(Factory):
protocol = ServerProtocol
def __init__(self, server):
self.server = server
def buildProtocol(self, addr):
return ServerProtocol(self.server)

class Server:
def __init__(self):
pass
def run(self, pipe):
"""
This is called in its own process
"""
from twisted.internet import reactor
endpoint = TCP4ServerEndpoint(reactor, self._port)
endpoint.listen(ServerProtocolFactory(self))
reactor.run()  # main Twisted reactor loop

class MyObject:
def __init__(self):
self._pipe = Pipe()
self._server = Server()
self._p = Process(target=self._server.run, args=(self._pipe, ))
self._p.start()
def stop(self):
# I want to send some stop command through the Pipe here
self._p.join()

if __name__ == "__main__":
obj = MyObject()
# do stuff here
obj.stop()

我不知道Twisted是否会以这种方式运行(即作为multiprocessing.Process的目标(。不过,让我们假设它会的。

CCD_ 7被记录为返回CCD_ 8对象的两元组。multiprocessing.Connection被记录为具有返回Connection使用的文件描述符(或句柄(的fileno方法。

如果它是一个文件描述符,那么可能有一个非常简单的路径可以将它与Twisted reactor集成。大多数Twisted电抗器实现IReactorFDSet,其具有接受IReadDescriptor值的addReader方法。

Connection不完全是IReadDescriptor,但它很容易被调整为

from attrs import define
from multiprocessing import Connection
from twisted.python.failure import Failure
@define
class ConnectionToDescriptor:
_conn: Connection
def fileno(self) -> int:
return self._conn.fileno()
def doRead(self) -> None:
some_data = self._conn.recv()
# Process some_data how you like
def connectionLost(self, reason: Failure) -> None:
self._conn.close()

如果您将其包含在读取的Connection中,然后将结果传递给reactor.addReader,则反应器将使用fileno来确定要监控的准备情况,并在有内容要读取时调用doRead

如果您还希望对向父进程发送字节提供reactor友好支持,则可以对管道的写入端应用类似的处理。

相关内容

  • 没有找到相关文章

最新更新