如何在Julia中声明用于并行计算的共享DataFrame



我在一个名为simulation_results的DataFrame上进行了一个大型模拟,我正在尝试并行化并将模拟结果保存在一个名为CC_2的DataFrame中。

并行化循环工作正常。问题是,如果我要将结果存储在数组中,我会在循环之前将其声明为SharedArray。我不知道如何将simulation_results声明为"共享数据框架"。所有处理器都可以在任何地方使用,并且可以修改。

代码片段如下:

addprocs(length(Sys.cpu_info()))
@everywhere begin
using <required packages>
df = CSV.read("/path/data.csv", DataFrame)
simulation_results = similar(df, 0) #I need to declare this as shared and modifiable by all processors 

nsims = 100000
end

@sync @distributed for sim in 1:nsims
nsim_result = similar(df, 0)
<the code which for one simulation stores the results in nsim_result >
append!(simulation_results, nsim_result)
end

问题是,由于simulation_results没有被声明为处理器共享和修改,在循环运行后,它基本上产生一个空的DataFrame,如@everywhere simulation_results = similar(df, 0)中所编码的。

非常感谢任何帮助!谢谢!

Julia中的分布式计算模式比你想做的要简单得多。

你的代码看起来应该或多或少像这样:

df = CSV.read("/path/data.csv", DataFrame)
@everywhere using <required packages>

simulation_results = @distributed (append!) for sim in 1:nsims
<the code which for one simulation stores the results in nsim_result >
nsim_result
end

注意,您不需要在Julia集群中的每个进程上加载df,因为@distributed将确保它是可读的。您也不需要@sync,因为在我的代码中,您将使用聚合器函数(df0)。

一个最小的工作示例(使用addprocs(4)运行):

@everywhere using Distributed, DataFrames
df = DataFrame(a=1:5,b=rand())

和现在的结果:

julia> @distributed (append!) for i in 2:5
DataFrame(bsum=sum(df.b[1:myid()]),c=myid())
end
4×2 DataFrame
Row │ bsum      c
│ Float64   Int64
─────┼─────────────────
1 │ 0.518127      2
2 │ 0.777191      3
3 │ 1.03625       4
4 │ 1.29532       5

只要您的数据框df在您处理的条目中是数字的,您就可以将它作为矩阵来回传递:

mynames = names(df)
matrix = Matrix(df)

然后将矩阵转换为sharearray并计算。然后回到矩阵

dfprocessed = DataFrame(matrix, mynames)

注意,如果数据框架的数据类型不一致,这个方法可能会失败。如果全部为整数或全部为浮点数,则效果最好。您可能必须首先删除非数字列或将其设置为数字级别。

最新更新