我不想共享,而只是从一个进程向另一个进程发送DataFrame
。
主DataFrame
被切割成若干块,每一块都在其自己的CPU核心上由一个单独的进程(在蟒蛇multiprocessing
的意义上)处理。在";"孩子";进程完成后,它们应该发回生成的数据帧,以便再次将它们连接在一起。
但在我的示例中,我到达了脚本的END
,但DataFrames从未通过multiprocessing.Queue
对象发回。
#!/usr/bin/env python3
import multiprocessing
import pandas as pd
def worker(df, queue):
print(multiprocessing.current_process())
# create new column
df['X'] = df.b + '-' + df.c
# modify existing column
df.d = df.d.apply(lambda x: x.upper())
# send it back to main process
queue.put(df) # tried .copy() also!
if __name__ == '__main__':
print(pd.__version__)
# initial data
df = pd.DataFrame({
'a': ['A', 'A', 'B', 'B'],
'b': list('XXXX'),
'c': list('6218'),
'd': ['zwei', 'zwei', 'vier', 'neuen']
})
# slice the data frame
df_parts = [
df.iloc[:2].copy(),
df.iloc[2:].copy()
]
processes = []
queue = multiprocessing.Queue()
for i in range(len(df_parts)):
p = multiprocessing.Process(target=worker,
args=(df_parts[i], queue) )
processes.append(p)
p.start()
for p in processes:
p.join()
while queue.full():
print(queue.get())
print('END')
我知道在multiprocessing.Process
实例之间发送数据是在后台处理的。但我没有收到任何关于这方面的错误。
输出:
1.2.5
<Process name='Process-2' parent=14112 started>
<Process name='Process-1' parent=14112 started>
END
>>>
您有几个问题:
- 根据
Queue.full()
的文档:
如果队列已满,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。
所以你应该而不是使用这个方法。
- 您必须在加入子流程后,永远不要尝试针对子流程写入的
multiprocessig.Queue
实例发出get。根据文档:
这意味着,如果您尝试加入该进程,则可能会出现死锁,除非您确定已放入队列的所有项目都已消耗掉。类似地,如果子进程是非守护进程,那么当父进程尝试加入其所有非守护进程的子进程时,它可能会挂起退出。
- 您无法确定正在创建的两个进程将其输出写入已创建的单个输出队列的顺序。如果您想确保以正确的顺序获得输出,请为每个进程创建一个单独的输出队列实例。这也简化了
get
的处理。如果您确实想使用一个输出队列,那么您知道每个进程都在写一条消息,并且您有N个进程,所以只需发出N个get
调用,就可以完成对队列的处理
#!/usr/bin/env python3
import multiprocessing
import pandas as pd
def worker(df, queue):
print(multiprocessing.current_process())
# create new column
df['X'] = df.b + '-' + df.c
# modify existing column
df.d = df.d.apply(lambda x: x.upper())
# send it back to main process
queue.put(df) # tried .copy() also!
if __name__ == '__main__':
print(pd.__version__)
# initial data
df = pd.DataFrame({
'a': ['A', 'A', 'B', 'B'],
'b': list('XXXX'),
'c': list('6218'),
'd': ['zwei', 'zwei', 'vier', 'neuen']
})
# slice the data frame
df_parts = [
df.iloc[:2].copy(),
df.iloc[2:].copy()
]
processes = []
queues = []
for i in range(len(df_parts)):
queue = multiprocessing.Queue()
queues.append(queue)
p = multiprocessing.Process(target=worker,
args=(df_parts[i], queue) )
processes.append(p)
p.start()
for queue in queues:
print(queue.get())
for p in processes:
p.join()
print('END')
打印:
1.3.0
<Process name='Process-1' parent=7748 started>
<Process name='Process-2' parent=7748 started>
a b c d X
0 A X 6 ZWEI X-6
1 A X 2 ZWEI X-2
a b c d X
2 B X 1 VIER X-1
3 B X 8 NEUEN X-8
END
使用一个输出队列:
processes = []
queue = multiprocessing.Queue()
for i in range(len(df_parts)):
p = multiprocessing.Process(target=worker,
args=(df_parts[i], queue) )
processes.append(p)
p.start()
for _ in range(len(processes)):
print(queue.get())
for p in processes:
p.join()