我需要在Python中的进程之间进行通信,并且在每个进程中使用asyncio
进行并发网络IO。
目前,我在进程之间使用multiprocessing.Pipe
到send
和recv
的数据量非常大,但我在asyncio
之外这样做,我相信我因此在IO_WAIT
上花费了大量的cpu时间。
似乎asyncio
可以也应该用于处理进程之间的管道IO,但是除了管道STDIN/STDOUT之外,我找不到任何其他示例。
从我读到的内容来看,我似乎应该向loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)
注册管道,同样也应该进行写入。然而,我不理解protocol_factory
的用途,因为它与multiprocessing.Pipe
有关。甚至还不清楚我是否应该创建一个multiprocessing.Pipe
,或者我是否可以在asyncio
中创建一个管道。
multiprocessing.Pipe
使用高级multiprocessing.Connection
模块,该模块pickle和unpickle Python对象,并在后台传输额外的字节。如果您想使用loop.connect_read_pipe()
从其中一个管道读取数据,您必须自己重新实现所有这些。
从multiprocessing.Pipe
读取而不阻塞事件循环的最简单方法是使用loop.add_reader()
。考虑以下示例:
import asyncio
import multiprocessing
def main():
read, write = multiprocessing.Pipe(duplex=False)
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
data_available = asyncio.Event()
asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)
if not read.poll():
await data_available.wait()
print(read.recv())
data_available.clear()
def writer(write):
write.send('Hello World')
if __name__ == '__main__':
main()
使用较低级别的os.pipe
创建的管道不会像multiprocessing.Pipe
中的管道那样添加任何额外的内容。因此,我们可以将os.pipe
与loop.connect_read_pipe()
一起使用,而无需重新实现任何内部工作。这里有一个例子:
import asyncio
import multiprocessing
import os
def main():
read, write = os.pipe()
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
pipe = os.fdopen(read, mode='r')
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader()
def protocol_factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
print(await stream_reader.readline())
transport.close()
def writer(write):
os.write(write, b'Hello Worldn')
if __name__ == '__main__':
main()
这段代码帮助我了解了如何使用loop.connect_read_pipe
。
aiopipe
似乎可以随心所欲!它可以与内置的multiprocessing
模块一起使用,并为常规阻塞管道提供类似的API。