我正在使用 Dask 来执行以下逻辑:
- 从多个输入文件读取主延迟
dd.DataFrame
(每个文件一个pd.DataFrame
) - 在主延迟数据帧上执行多个
query
调用 - 使用
DataFrame.to_hdf
保存DataFrame.query
调用中的所有数据帧。
如果我在to_hdf
调用中使用compute=False
并将每个to_hdf
调用返回的Delayed
列表提供给dask.compute
那么我就会得到崩溃/seg 错误。(如果我省略compute=False
一切正常)。一些谷歌搜索给了我一些关于锁的信息;我尝试添加一个dask.distributed.Client
,dask.distributed.Lock
喂给to_hdf
,以及一个dask.utils.SerializableLock
,但我无法解决崩溃。
流程如下:
import uproot
import dask
import dask.dataframe as dd
from dask.delayed import delayed
def delayed_frame(files, tree_name):
"""create master delayed DataFrame from multiple files"""
@delayed
def single_frame(file_name, tree_name):
"""read external file, convert to pandas.DataFrame, return it"""
tree = uproot.open(file_name).get(tree_name)
return tree.pandas.df() ## this is the pd.DataFrame
return dd.from_delayed([single_frame(f, tree_name) for f in files])
def save_selected_frames(df, selections, prefix):
"""perform queries on a delayed DataFrame and save HDF5 output"""
queries = {sel_name: df.query(sel_query)
for sel_name, sel_query in selections.items()]
computes = []
for dfname, df in queries.items():
outname = f"{prefix}_{dfname}.h5"
computes.append(df.to_hdf(outname, f"/{prefix}", compute=False))
dask.compute(*computes)
selections = {"s1": "(A == True) & (N > 1)",
"s2": "(B == True) & (N > 2)",
"s3": "(C == True) & (N > 3)"}
from glob import glob
df = delayed_frame(glob("/path/to/files/*.root"), "selected")
save_selected_frames(df, selections, "selected")
## expect output files:
## - selected_s1.h5
## - selected_s2.h5
## - selected_s3.h5
也许你正在使用的HDF库不是完全线程安全的? 如果不介意失去并行性,则可以将scheduler="single-threaded"
添加到计算调用中。
您可能需要考虑使用镶木地板而不是HDF。 它有更少的这样的问题。