假设我有一个函数,当调用它时,它会给我大量来自设备的数据。我想把这些数据累加到内存缓冲区中。当缓冲区达到任意选择的长度时,另一个函数跳入,获取缓冲区并对其执行一些操作。数据流不应该停止,所以我正在考虑在一个单独的进程或线程中运行这个函数。
一个简短的草稿应该是这样的(没有实现线程或多处理):
def get_data():
getting data...
def get_bufferchunk():
get the current buffer and do something...
buffer = []
while True:
data = get_data()
buffer.append(data)
if len(data) == 100000:
get_bufferchunk(buffer)
buffer = []
所以函数get_bufferchunk()
应该在并行进程中运行,这样while循环可以继续运行,数据流不会被阻塞。
我的问题,如果这是一个合理的想法,如果是这样一个会使用多处理或线程为此目的?我只看到了问题,必须确保函数内的代码执行速度快于缓冲区重新填充到1000000个值。
Michael Butscher提供的评论,如果我理解正确的话,建议您将buffer
对象实现为共享内存类型,也许是某种合适类型的multiprocessing.sharedctypes.RawArray
。通过这种方式,数据可以在进程之间非常有效地传递,因为数据驻留在共享内存中,并且两个进程都可以直接"看到"数据。无需将数据从一个地址空间传递到另一个地址空间。但是RawArray
类只支持它可以保存的有限数量的数据类型,这对您来说可能是问题,也可能不是问题。此外,您还需要想出一种方法,允许收集数据的进程在另一个进程处理数据时继续向数组添加数据。这会变得相当复杂。
所以我将首先尝试传递给每个进程一个multiprocessing.Queue
实例。当get_bufferchunk
获得一个完整的缓冲区时(这可能只是一个普通的列表),它然后对队列执行缓冲区的put
。另一个进程,例如由工作函数process_bufferchunk
表示,循环调用队列上的get
来检索下一个缓冲区并处理它。这取决于buffer
持有的数据类型、process_bufferchunk
进行的处理类型、通过get_data
进入的数据速度等。我会相应地调整buffer
的长度触发put
进入队列。
总的思路是:
from multiprocessing import Process, Queue
def get_data():
return 1
def get_bufferchunk(q):
# get the current buffer and do something...
buffer = []
# So we eventually terminate:
#while True:
for _ in range(600_000):
data = get_data()
buffer.append(data)
if len(buffer) == 100_000: # or a smaller chunk, for example 1000
q.put(buffer)
buffer = []
# tell other process that there is no more data coming
q.put(None)
def process_bufferchunk(q):
while True:
buffer = q.get()
if buffer is None: # Sentinel?
# Yes: Signal for us to terminate:
break
# For demo purposes just print the length:
print('Buffer length = ', len(buffer))
def main():
q = Queue()
p1 = Process(target=get_bufferchunk, args=(q,))
p2 = Process(target=process_bufferchunk, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
# Required for Windows:
if __name__ == '__main__':
main()
打印:
Buffer length = 100000
Buffer length = 100000
Buffer length = 100000
Buffer length = 100000
Buffer length = 100000
Buffer length = 100000