我已经有一段时间没有使用python的多处理了。我有一个任务可能会从中受益,或者我的假设不正确。我只能为我的用例提供代理。
本质上,我正在模拟实时输入场景。如果原始输入是街景的视频,则我的输入将是帧到帧的唯一标识和跟踪车辆的状态,例如state_0: v_id=0; x=0; y=1; vx=2.5; vy=-3
.
这些状态将一次一个地出现,而不是一帧一帧地出现。因此,如果上面的例子来自第 1 帧,如果在第 2 帧中跟踪车辆 0 并拾取新车辆,我将得到:
state_1: v_id=0; x=2.5; y=-2; vx=3; vy=-2.5
state_2: v_id=1; x=5.6; y=3; vx=-1; vy=0
没有帧信息,只有状态一个又一个状态。
状态由一个带有自定义迭代器的State
对象表示,该迭代器返回v_id
、[x, y, vx, vy]
。
我想使用给定车辆的先前信息对这些状态进行一些分析。以下是更多代理代码:
self.analysis = {}
def estimate(self):
for v_id, state in self.State:
if v_id not in self.analysis.keys():
self.analysis[v_id] = state
state_estimate = self.do_something(self.analysis[v_id], state)
self.analysis[vid].append(state_estimate)
我所拥有的有效,但我想知道我是否可以使用多处理来改进它。我的想法是,我可以将所有状态v_id=0
发送到进程 1,将所有状态v_id=1
发送到进程 2,依此类推。
由于do_something()
函数占用了大量时间,我认为如果State
迭代器将状态分发给多个进程会更快。
我相信迭代器将是一个瓶颈,因为我不允许自己"展望"任何未来的状态。但是,我是否可以设置 16 个流程,并根据当前状态的v_id
,让给定流程执行状态估计分析?多处理在这里合适吗?
我知道这行不通,但这是一些东西。
我正在努力解决如何将任务提供给工人,如果他们都运行相同的可调用但具有不同的v_id
和state
。
第一次编辑:我认为这更接近目标,我为每个进程都有一个队列。state
根据v_id
添加到队列中。还在试图找出工人。
第二次编辑:也许我同时需要输入和输出队列?也许我根本不需要跟踪流程?试图以这些示例为基础。
正在进行的解决方案:
self.analysis = {}
self.queues = {}
self.n_workers = multiprocessing.cpu_count()
for worker in range(self.n_workers):
q_i = multiprocessing.Queue()
q_o = multiprocessing.Queue()
multiprocessing.Process(target=self.worker, args=(q_i, q_o), name=str(worker)).start()
self.queues[v_id] = {'input': q_i, 'output': q_o}
def worker(self, input_queue, output_queue):
??? Maybe something here takes the place of self.analysis ???
def loop_states(self):
for v_id, state in self.State:
if v_id not in self.analysis.keys():
self.analysis[v_id] = state
queue = self.queues[v_id % self.n_workers]['input']
queue.put(self.estimate, (v_id, state))
def estimate(self, v_id, state):
state_estimate = self.do_something(self.analysis[v_id], state)
self.analysis[vid].append(state_estimate)