我可以使用asyncio对多处理进行读写吗.管道



我需要在Python中的进程之间进行通信,并且在每个进程中使用asyncio进行并发网络IO。

目前,我在进程之间使用multiprocessing.Pipesendrecv的数据量非常大,但我在asyncio之外这样做,我相信我因此在IO_WAIT上花费了大量的cpu时间。

似乎asyncio可以也应该用于处理进程之间的管道IO,但是除了管道STDIN/STDOUT之外,我找不到任何其他示例。

从我读到的内容来看,我似乎应该向loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)注册管道,同样也应该进行写入。然而,我不理解protocol_factory的用途,因为它与multiprocessing.Pipe有关。甚至还不清楚我是否应该创建一个multiprocessing.Pipe,或者我是否可以在asyncio中创建一个管道。

multiprocessing.Pipe使用高级multiprocessing.Connection模块,该模块pickle和unpickle Python对象,并在后台传输额外的字节。如果您想使用loop.connect_read_pipe()从其中一个管道读取数据,您必须自己重新实现所有这些。

multiprocessing.Pipe读取而不阻塞事件循环的最简单方法是使用loop.add_reader()。考虑以下示例:

import asyncio
import multiprocessing

def main():
read, write = multiprocessing.Pipe(duplex=False)
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))

async def reader(read):
data_available = asyncio.Event()
asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)
if not read.poll():
await data_available.wait()
print(read.recv())
data_available.clear()

def writer(write):
write.send('Hello World')

if __name__ == '__main__':
main()

使用较低级别的os.pipe创建的管道不会像multiprocessing.Pipe中的管道那样添加任何额外的内容。因此,我们可以将os.pipeloop.connect_read_pipe()一起使用,而无需重新实现任何内部工作。这里有一个例子:

import asyncio
import multiprocessing
import os

def main():
read, write = os.pipe()
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))

async def reader(read):
pipe = os.fdopen(read, mode='r')
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader()
def protocol_factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
print(await stream_reader.readline())
transport.close()

def writer(write):
os.write(write, b'Hello Worldn')

if __name__ == '__main__':
main()

这段代码帮助我了解了如何使用loop.connect_read_pipe

aiopipe似乎可以随心所欲!它可以与内置的multiprocessing模块一起使用,并为常规阻塞管道提供类似的API。

最新更新