延迟dask.dataframe.DataFrame.to_hdf计算崩溃



我正在使用 Dask 来执行以下逻辑:

  • 从多个输入文件读取主延迟dd.DataFrame(每个文件一个pd.DataFrame)
  • 在主延迟数据帧上执行多个query调用
  • 使用DataFrame.to_hdf保存DataFrame.query调用中的所有数据帧。

如果我在to_hdf调用中使用compute=False并将每个to_hdf调用返回的Delayed列表提供给dask.compute那么我就会得到崩溃/seg 错误。(如果我省略compute=False一切正常)。一些谷歌搜索给了我一些关于锁的信息;我尝试添加一个dask.distributed.Clientdask.distributed.Lock喂给to_hdf,以及一个dask.utils.SerializableLock,但我无法解决崩溃。

流程如下:

import uproot
import dask
import dask.dataframe as dd
from dask.delayed import delayed
def delayed_frame(files, tree_name):
"""create master delayed DataFrame from multiple files"""
@delayed
def single_frame(file_name, tree_name):
"""read external file, convert to pandas.DataFrame, return it"""
tree = uproot.open(file_name).get(tree_name)
return tree.pandas.df() ## this is the pd.DataFrame
return dd.from_delayed([single_frame(f, tree_name) for f in files])
def save_selected_frames(df, selections, prefix):
"""perform queries on a delayed DataFrame and save HDF5 output"""
queries = {sel_name: df.query(sel_query)
for sel_name, sel_query in selections.items()]
computes = []
for dfname, df in queries.items():
outname = f"{prefix}_{dfname}.h5"
computes.append(df.to_hdf(outname, f"/{prefix}", compute=False))
dask.compute(*computes)

selections = {"s1": "(A == True) & (N > 1)",
"s2": "(B == True) & (N > 2)",
"s3": "(C == True) & (N > 3)"}
from glob import glob
df = delayed_frame(glob("/path/to/files/*.root"), "selected")
save_selected_frames(df, selections, "selected")
## expect output files:
##  - selected_s1.h5
##  - selected_s2.h5
##  - selected_s3.h5

也许你正在使用的HDF库不是完全线程安全的? 如果不介意失去并行性,则可以将scheduler="single-threaded"添加到计算调用中。

您可能需要考虑使用镶木地板而不是HDF。 它有更少的这样的问题。

最新更新