如何在涉及写入文件或 .gzip 文件的多生产者-消费者流中正确使用 asyncio?



我正在实现一个python模块,该模块采用具有三个列表(xyval(的元组,并根据给定的比率对它们进行子采样。我的做法是否正确?

  1. 是否异步写入磁盘?
  2. 我是否可以让许多生产者和使用者生成数据并将其写入同一个输出文件?
  3. 当我将此代码与单个线程的朴素实现进行比较时,它们在运行时方面的执行类似。

import bisect
import numpy as np
import gzip
import asyncio
class SignalPDF:
def __init__(self, inputSignal):
self.x         = inputSignal[0][:]
self.y         = inputSignal[1][:]
self.vals      = inputSignal[2][:]
self.valCumsum = np.cumsum(self.vals)
self.totalSum  = np.sum(self.vals)
self.N         = len(self.vals)
class SignalSampler:
def __init__(self, inputSignal, ratio=1.0):
self.signalPDF = SignalPDF(inputSignal)
self.Q         = asyncio.Queue()
self.ratio     = float(ratio)
self.N         = int(self.signalPDF.N/self.ratio)
self.sampledN  = 0
async def randRead(self):
while self.sampledN < self.N:
i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
self.sampledN += 1 
cell = bisect.bisect(self.signalPDF.valCumsum, i)
yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))
async def readShortFormattedLine(self):
async for read in self.randRead():
x = read[0]; y = read[1]; val = read[2]; 
yield '{0} {1} {2}'.format(x,y,val)
async def populateQueue(self):
async for i in self.readShortFormattedLine():
await self.Q.put(i)
await self.Q.put(None)
async def hanldeGzip(self, filePath):
with gzip.open(filePath, 'wt') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}n'.format(item))
f.flush()
async def hanldeFile(self, filePath):
with open(filePath, 'w+') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}n'.format(item))
f.flush()
def main(gzip, outputFile):
x=[]; y=[];val=[]
for i in range(100):
for j in range(100):
x.append(i)
y.append(j)
val.append(np.random.randint(0,250))
loop      = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
futures = []
if gzip:
futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
else:
futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()] 
tasks   = asyncio.wait(futures, loop=loop)
results = loop.run_until_complete(tasks)
loop.close()
main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')

asyncio的工作原理

让我们考虑一个发出两个 Web 请求的任务。

同步版本:

  1. 发送请求 1
  2. 等待答案 1 秒。
  3. 发送请求 2
  4. 等待答案 1 秒。
  5. 两个请求都在2 秒内完成。

异步版本:

  1. 发送请求 1
  2. 无需等待,而是立即发送请求 2
  3. 等待答案超过 1 秒。
  4. 两个请求都在1 秒内完成。

asyncio允许您编写实际上像第二个异步版本一样工作的程序,而您的代码看起来与(直观的(第一个版本非常相似。

请注意这里的重要一点:异步版本更快的唯一原因是它立即启动另一个并发操作,而不是等待第一个完全完成。它与线程无关,asyncio单个主线程中工作。


磁盘 I/O 呢?

您的硬件可以并行读/写两个文件吗?

如果您有一个物理硬盘,那么可能没有:它有一个物理"针",可以同时读取/写入单个数据。异步方法对您没有帮助。

如果您有多个磁盘,情况可能会有所不同。虽然我有想法 OS/asyncio是否可以并行处理多个磁盘(可能不是(。

假设您希望硬件和操作系统支持多个磁盘 I/O。它可能仅在使用多个线程或进程进行操作时才有效:

  • 模块 aiofiles 使用线程来处理文件 - 您可以尝试一下
  • 要使用ProcessPoolExecutorasyncio流程,您可以使用run_in_executor
  • 如下所示

使用进程甚至线程也有可能纯粹由于并行化相关的 CPU 密集型操作而增加磁盘 I/O,但我不知道情况是否如此以及它有多大好处(与磁盘 I/O 相比可能不多(。

相关内容

  • 没有找到相关文章

最新更新