我希望根据列'col1'的值将数据帧拆分为多个数据帧,并使用多处理将拆分后的数据帧分配给每个核心。
数据帧:
col col1
0 0 a
1 1 a
2 2 b
3 3 a
4 4 c
5 5 c
6 6 a
7 7 c
8 8 b
9 9 a
import multiprocessing
import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
cores = cpu_count()
partitions = cores
df = pd.DataFrame({'col': [0,1,2,3,4,5,6,7,8,9],
'col1':['a','a','b','a','c','c','a','c','b','a']})
def parallelize_dataframe(df, func):
data = np.array_split(df, partitions)
print(data)
pool = Pool(cores)
df = pd.concat(pool.map(func, data))
pool.close()
pool.join()
return df
def square(x):
return x**2
def test_func(data):
data["square"] = data["col"].apply(square)
return data
test = parallelize_dataframe(df, test_func)
数据帧的预期分割
col col1
0 0 a
1 1 a
3 3 a
6 6 a
9 9 a
和
col col1
2 2 b
8 8 b
类似于列"col1"中的唯一值
然后使用多重处理将分割的数据帧分配给每个核心,并对其应用一个函数
请帮助我拆分数据帧,并将其分别分配给每个核心进行并行处理。
import math
import multiprocessing
import pandas as pd
df = pd.DataFrame({'col': [0,1,2,3,4,5,6,7,8,9],'col1':['a','a','b','a','c','c','a','c','b','a']})
num_split_df = math.floor(len(df)/2) # 2 - splits in df
m = multiprocessing.Manager()
q = m.Queue() # use this manager Queue instead of multiprocessing Queue as that causes error
pool_tuple = [(i,q,df_emp[(i * 6):((i + 1) * 6)]) for i in range(num_split)] # 6 - rows in each df
with multiprocessing.Pool(processes=4) as pool: # number of cores
results = pool.starmap(multiprocessing_func, pool_tuple)
def multiprocessing_func(num, q, df):
...