朱莉娅 - 在文件中具有函数的工作器上加载错误(主)



我开始尝试使用Julia进行并行处理。

我在此示例中使用的是@spawn宏,但在使用remotecall_fetch函数时遇到了相同的错误。

以下是代码:

function count_proteins(fpath::String)
cnt::Int = 0
if !isfile(fpath)
write(Base.stderr, "FASTA not found!")
else
reader = open(FASTA.Reader, fpath)
for record in reader
cnt += 1
end
end
# return the count
cnt
end

"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1}, threads::Int16=4)    
# initialize workers
addprocs(threads)
fut = Dict{Int, Future}()
# launch the jobs
for (i, fastaPath) in enumerate(fPaths)
r = @spawn count_proteins(fastaPath)
fut[i] = r
end
for (i, res) in fut
s = fetch(res)
end
end
### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
threads = Int16(2)
parallel_count_proteins(flist, threads)

当我尝试使用fetch()获取结果时会发生错误:

错误:

加载错误:在工作线程 3 上

。这是堆栈跟踪:

Stacktrace:
[1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:379
[2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
[3] #remotecall_fetch#152 at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
[4] remotecall_fetch at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
[5] call_on_owner at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
[6] fetch(::Future) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:511
[7] parallel_count_proteins(::Array{String,1}, ::Int16) at /Users/salvocos/Google_Drive/julia_programming/mcl_graph_to_label.jl:150
[8] top-level scope at none:0
[9] include at ./boot.jl:326 [inlined]
[10] include_relative(::Module, ::String) at ./loading.jl:1038
[11] include(::Module, ::String) at ./sysimg.jl:29
[12] exec_options(::Base.JLOptions) at ./client.jl:267
[13] _start() at ./client.jl:436

我知道需要让所有工人都意识到该功能count_proteins的用途,但我不确定如何做到这一点。

正如您所说,您需要使count_proteins可用于所有工作进程。

您可以在函数定义之前使用@everywhere宏,以使其可供所有工作线程使用。@everywhere对所有工作线程执行给定的表达式。

另一种方法是将应该可供工作人员使用的函数放在另一个.jl文件和@everywhere include("my_helper_functions.jl")中,或者将函数定义放在begin...end块中,并在begin之前放置一个@everywhere并运行该块。您需要在创建工作进程后执行此操作。将此类函数放在模块/包中并运行@everywhere using MyModule也应该有效。

对于您的代码,解决方案将是

# addprocs here before @everywhere definitions
addprocs(2)
@everywhere function count_proteins(fpath::String)
cnt::Int = 0
if !isfile(fpath)
write(Base.stderr, "FASTA not found!")
else
reader = open(FASTA.Reader, fpath)
for record in reader
cnt += 1
end
end
# return the count
cnt
end

"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})
fut = Dict{Int, Future}()
# launch the jobs
for (i, fastaPath) in enumerate(fPaths)
r = @spawn count_proteins(fastaPath)
fut[i] = r
end
for (i, res) in fut
s = fetch(res)
end
end
### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
parallel_count_proteins(flist)

作为旁注,如果我了解您要正确执行的操作,则可以在此处简单地使用pmap,这会将任务一个接一个地发送到进程,从而有效地平衡负载。

您可能会发现阅读有关并行计算中的代码和数据可用性的手册条目以及整个并行计算部分很有用。对于数据可用性部分,还有一个名为ParallelDataTransfer.jl的软件包,如果需要,它可以在进程之间移动数据变得更加容易。

正如上面@hckr很好地解释的那样,在使用@everywhere宏之前,应该部署工作线程(使用 addprocs(线程))。

@everywhere可以以不同的方式和程序的不同部分调用和使用。 就我而言,我正在加载要从模块并行运行的函数。

要从主并行使用此功能,我正在使用@everywhere include("myModule.jl").

以下是 MyModule 的代码:

module MyModule    
using Distributed
using Printf: @printf
using Base
"""Count sequences in the input FASTA"""
function count_proteins(fpath::String)::Int
cnt::Int = 0
#@show fpath
if !isfile(fpath)
write(Base.stderr, "nInput FASTA not found!")
else
open(fpath, "r") do ifd
for ln in eachline(ifd)
if ln[1] == '>'
#println(ln)
cnt += 1
end
end
end
end
# return the count
@printf("%st%dn", fpath, cnt)
cnt
end
"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})
# spawn the jobs
for (i, fastaPath) in enumerate(fPaths)
r = @spawn count_proteins(fastaPath)
# @show r
s = fetch(r)
end    
end

以下是使用函数main.jl来自MyModuleparallel_count_proteins.

### main.jl ###
using Base
using Distributed
using Printf: @printf
# add path to the modules directory
push!(LOAD_PATH, dirname(@__FILE__)) # MyModule is in the same directory as main.jl
#### MAIN START ####
# deploy the workers
addprocs(4)
# load modules with multi-core functions
@everywhere include(joinpath(dirname(@__FILE__), "MyModule.jl"))
# paths with 4 input files (all in same dir as main.jl)
flist = ["tv1", "tv2", "tv3", "tv4"]
# count proteins
MyModule.parallel_count_proteins(flist)

相关内容

  • 没有找到相关文章

最新更新