使用多处理队列处理块中的元素



我有一个多处理队列;使用一个SENTINEL值(一个字符串)来表示队列的结束。

aq =  Queue() 

........................

队列中的实例属于A类:

class A:
id: str
desc: str

在一个函数中,我从队列aq中获取元素并在块中处理它们。第一个元素(如果只有一个)可以是SENTINEL,不需要处理。…

def process:
chunk_data = []
all = [
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start process in chunks
# adding elements to the chunk list until is full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()  

该函数按预期工作,但我认为代码有些复杂。我在寻找一个简单,清晰的版本。

为了遵循DRY原则,我认为这是一个没有重复逻辑的更清晰的代码版本。请注意,当输入值的类型不符合预期时,通过引发异常来处理错误通常比简单地返回要好。

def process():
all = []
while True:
chunk_data = []
for _ in range(CHUNK_MAX_SIZE):
if (item := aq.get()) == SENTINEL:
break
assert isinstance(item, A)
chunk_data.append(item.id)
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break

如果你不需要验证每个项目是否为A类型(如果你控制了排队项目的代码,你不应该这样做),你也可以用iteritertools.islice进一步清理代码:

from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
while True:
chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break

感谢@KellyBundy提供了一个更简洁的版本,如下所述:

from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
ids = map(attrgetter('id'), data)
while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
all += process_data(chunk_ids)

我倾向于这样组织代码:

def get_chunks():
chunk_data = []
while True:
item = aq.get()
if item == SENTINEL: # or: if not isinstance(item A):
break
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
for chunk_data in get_chunks():
all.extend(process_data(chunk_data))

但是对于"writer"使用get_chunks,以便将已经生成的块写入队列。这将导致更少(但更大)的队列访问,通常将更有效。

这里是一个例子,我假设所有的A实例都在一个列表中,list_of_a_instances:

def get_chunks():
chunk_data = []
for item in list_of_a_instances: # a list of all the A instances, for example
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
while True:
chunk_data = aq.get()
if chunk_data == SENTINEL:
break
all.extend(process_data(chunk_data))
def writer():
for chunk_data in get_chunks():
aq.put(chunk_data)
aq.put(SENTINEL)

最新更新