Python:流入快速使用者的慢速生成器会耗尽缓冲区并提前终止



假设我有一个生成器,它可以缓慢地创建我的数据流:

import threading
import time
class SlowStreamSource():
def __init__(self):
self.buffer = ""
self.gen_data = threading.Thread(target=self.generate_stream)
self.gen_data.start()
def generate_stream(self):
i = 0
while i < 10:
self.buffer += str(i)
# Other processing happens
time.sleep(0.1)
i += 1
self.buffer += "-Stream Finished-"
def read(self, hint = -1):
if hint is None or hint < 0:
result = self.buffer
self.buffer = ""
else:
result = self.buffer[:hint]
result = self.buffer[hint:]
return result

这些数据被发送到比生成器快得多的消费者遵循调用read()直到没有更多数据并退出的标准做法

import time
class FastStreamDestination():
def __init__(self, source):
self.source = source
def process_stream(self):
while True:
data = self.source.read()
if not data:
break
print(f'read "{data}"')
# Other processing happens
time.sleep(0.05)

(我无法控制消费者。这是亚马逊的boto3 upload_fileobj,但我已经查看了他们的代码,以确定这基本上就是它的功能。(

当我将生成器输入消费者时,它会很快耗尽缓冲区,得出流已完成并提前退出的结论。

src = SlowStreamSource()
dst = FastStreamDestination(src)
dst.process_stream()

产生read "0",但我最终需要类似的东西

read "0"
read "1"
read "2"
read "3"
read "4"
read "5"
read "6"
read "7"
read "8"
read "9"
read "-Stream Finished-"

有没有办法确保我的消费者从我的生成器中读取整个流,记住我不能有意义地加快生成器的速度,也不能以任何方式修改消费者?

好吧,在同事的帮助下,我想我找到了解决方案。

我的生成器可以知道是否有更多的数据可以提供,即使它还没有准备好数据。由于它是一个类似File的对象,这意味着它有一个close函数,当我确信数据全部生成时,可以调用它。

有了这种意识,我可以根据需要制作读取功能块,以确保它有一些数据要返回。

import threading
import time
class SlowStreamSource():
def __init__(self):
self.buffer = ""
self.done = False
self.gen_data = threading.Thread(target=self.generate_stream)
self.gen_data.start()
self.closed = False
def generate_stream(self):
i = 0
while i < 10:
self.buffer += str(i)
# Other processing happens
time.sleep(0.1)
i += 1
self.buffer += "-Stream Finished-"
self.closed = True
def read(self, hint = -1):
while not self.closed and len(self.buffer) == 0:
time.sleep(0.1)
if hint is None or hint < 0:
result = self.buffer
self.buffer = ""
else:
result = self.buffer[:hint]
result = self.buffer[hint:]
return result

最新更新