将Spark数据帧转换为Dask数据帧



首先,我将Spark df命名日历作为一个名为cal.的镶木地板文件编写

calendar.write.parquet("/user/vusal.babashov/dataset/cal",模式="覆盖"(

然后,我将它从Hadoop复制到我的个人文件夹中。

hdfs-dfs-copyToLocal-f/user/vusal.bashov/dataset/cal/home/vusal.Bashov/dataset

最后,我试着把镶木地板读成Dask。

将dask.dataframe导入为dd

df=dd.read_parquet('/home/vusal.babashov/datasets/cal'(

最后一步是我不断得到OSError的地方:[Erno 22]无效的参数错误。路径是正确的,名为cal的镶木地板文件在那里(我可以确认(。

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325         chunksize=chunksize,
326         aggregate_files=aggregate_files,
--> 327         **kwargs,
328     )
329 
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732         # correspond to a row group (populated below).
733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
735         )
736 
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163         elif gather_statistics is not False:
164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166         else:
167             # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91         if isinstance(fn, (tuple, list)):
92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
94             if basepath:
95                 self.fn = join_path(basepath, '_metadata')  # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145 
146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148         else:
149             # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145 
146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148         else:
149             # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119                 self.fn = join_path(fn)
120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
122             elif "*" in fn or fs.isdir(fn):
123                 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161                 if verify:
162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
164                 head_size = struct.unpack('<i', f.read(4))[0]
165                 if verify:
OSError: [Errno 22] Invalid argument

当我运行df=dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet',engine='parrow'(时,我得到以下错误


RuntimeError                              Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
290 
291     if isinstance(engine, str):
--> 292         engine = get_engine(engine)
293 
294     if hasattr(path, "name"):
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
917 
918         if pa_version < parse_version("0.13.1"):
--> 919             raise RuntimeError("PyArrow version >= 0.13.1 required")
920 
921         if engine == "pyarrow-dataset" and pa_version.major >= 1:
RuntimeError: PyArrow version >= 0.13.1 required

我在服务器上使用的是Spark 2.4和Python 3.7。我的PyArrow版本是0.11.1。升级到其他版本会导致环境不一致。根据Dask的文档,默认引擎是auto,它选择fastparquet(我安装了它(。当我运行df=dd.read_parquet('/home/vusal.babashov/datasets/cal',engine='auto'(时,我会得到相同的OSError:[Erno 22]无效参数


--
OSError                                   Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325         chunksize=chunksize,
326         aggregate_files=aggregate_files,
--> 327         **kwargs,
328     )
329 
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732         # correspond to a row group (populated below).
733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
735         )
736 
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163         elif gather_statistics is not False:
164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166         else:
167             # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91         if isinstance(fn, (tuple, list)):
92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
94             if basepath:
95                 self.fn = join_path(basepath, '_metadata')  # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145 
146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148         else:
149             # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145 
146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148         else:
149             # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119                 self.fn = join_path(fn)
120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
122             elif "*" in fn or fs.isdir(fn):
123                 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161                 if verify:
162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
164                 head_size = struct.unpack('<i', f.read(4))[0]
165                 if verify:
OSError: [Errno 22] Invalid argument

Parquet文件快照

无效参数表示代码正在尝试将D_1seek()发送到不存在的文件中的某个位置。读取镶木地板文件的元数据页脚时,最后8个字节包含页脚的大小,这就是为什么要查找(-8, 2)(文件末尾前8个字节(。

情况很可能是这样的:spark在目录中写入了一些不是数据文件的额外文件,比如奇怪的零长度&quot_成功";文件和任何校验和;但没有编写CCD_ 3文件来告诉dask哪些文件将被视为数据。

这个过滤真实文件的问题,我很可能在最新的dask和fastparquet中解决了(可能还有箭头(。如果无法更新,则需要删除导致混乱的文件。

相关内容

  • 没有找到相关文章

最新更新