我在一个名为simulation_results
的DataFrame上进行了一个大型模拟,我正在尝试并行化并将模拟结果保存在一个名为CC_2的DataFrame中。
并行化循环工作正常。问题是,如果我要将结果存储在数组中,我会在循环之前将其声明为SharedArray
。我不知道如何将simulation_results
声明为"共享数据框架"。所有处理器都可以在任何地方使用,并且可以修改。
代码片段如下:
addprocs(length(Sys.cpu_info()))
@everywhere begin
using <required packages>
df = CSV.read("/path/data.csv", DataFrame)
simulation_results = similar(df, 0) #I need to declare this as shared and modifiable by all processors
nsims = 100000
end
@sync @distributed for sim in 1:nsims
nsim_result = similar(df, 0)
<the code which for one simulation stores the results in nsim_result >
append!(simulation_results, nsim_result)
end
问题是,由于simulation_results
没有被声明为处理器共享和修改,在循环运行后,它基本上产生一个空的DataFrame,如@everywhere simulation_results = similar(df, 0)
中所编码的。
非常感谢任何帮助!谢谢!
Julia中的分布式计算模式比你想做的要简单得多。
你的代码看起来应该或多或少像这样:
df = CSV.read("/path/data.csv", DataFrame)
@everywhere using <required packages>
simulation_results = @distributed (append!) for sim in 1:nsims
<the code which for one simulation stores the results in nsim_result >
nsim_result
end
注意,您不需要在Julia集群中的每个进程上加载df
,因为@distributed
将确保它是可读的。您也不需要@sync
,因为在我的代码中,您将使用聚合器函数(df
0)。
一个最小的工作示例(使用addprocs(4)
运行):
@everywhere using Distributed, DataFrames
df = DataFrame(a=1:5,b=rand())
和现在的结果:
julia> @distributed (append!) for i in 2:5
DataFrame(bsum=sum(df.b[1:myid()]),c=myid())
end
4×2 DataFrame
Row │ bsum c
│ Float64 Int64
─────┼─────────────────
1 │ 0.518127 2
2 │ 0.777191 3
3 │ 1.03625 4
4 │ 1.29532 5
只要您的数据框df
在您处理的条目中是数字的,您就可以将它作为矩阵来回传递:
mynames = names(df)
matrix = Matrix(df)
然后将矩阵转换为sharearray并计算。然后回到矩阵
dfprocessed = DataFrame(matrix, mynames)
注意,如果数据框架的数据类型不一致,这个方法可能会失败。如果全部为整数或全部为浮点数,则效果最好。您可能必须首先删除非数字列或将其设置为数字级别。