问题:如何使用multiprocessing
或concurrent.futures
将数据帧的列传递到每行的函数中?
详细信息:对于df
中的每一行,我希望将其列leader
和years
传递到函数print_sentences()
中。我想以并行的方式使用该函数,其中每一行都是异步打印的。例如,我想使用concurrent.futures.Executor.map
。
它需要在Python 3.6.
中
Reprex:我的实际问题在计算上很苛刻,所以这里有一个简化的reprex
:
import pandas as pd
import numpy as np
import concurrent.futures
df = pd.DataFrame(np.array([["Larry", 3, "Germany"], ["Jerry", 5, "Sweden"], ["George", 12, "UK"]]),
columns=['leader', 'years', 'score'])
def print_sentences(df):
print(df["leader"] + " has been leader for " + df["years"] + " years")
print_sentences(df)
背景:与此问题相关的其他问题似乎涉及dataframe
以外的对象类型。
当我读到.csv
数据帧时,我的特定问题就开始了。我想把这个数据帧的每一行的列传递到某个函数中。我的实际函数(对于reprex来说大大简化了(在计算上要求很高。它抓取数据并将其保存到.json
。因此,每一行都执行不同的查询(例如,输入不同领导者的姓名和分数(。
为了优化这一点,我希望行以并行的方式映射到函数中。
我已经用上面的reprex
简化了我的问题。
感谢您提前提供的帮助。
试试这个,
#为反映您的用例而进行的编辑。
import pandas as pd
import numpy as np
from multiprocessing import cpu_count, Pool
cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want
def parallelize(data, func):
data_split = np.array_split(data, partitions)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def print_sentences(cols):
leader, years = cols[0], cols[1]
print(leader + " has been leader for " + years + " years")
df = pd.DataFrame(np.array([["Larry", 3, "Germany"], ["Jerry", 5, "Sweden"],
["George", 12, "UK"]]),
columns=['leader', 'years', 'score'])
data = df.copy()
data = parallelize(data, print_sentences)
data.apply(print_sentences, axis=1)