首先,我将Spark df命名日历作为一个名为cal.的镶木地板文件编写
calendar.write.parquet("/user/vusal.babashov/dataset/cal",模式="覆盖"(
然后,我将它从Hadoop复制到我的个人文件夹中。
hdfs-dfs-copyToLocal-f/user/vusal.bashov/dataset/cal/home/vusal.Bashov/dataset
最后,我试着把镶木地板读成Dask。
将dask.dataframe导入为dd
df=dd.read_parquet('/home/vusal.babashov/datasets/cal'(
最后一步是我不断得到OSError的地方:[Erno 22]无效的参数错误。路径是正确的,名为cal的镶木地板文件在那里(我可以确认(。
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
当我运行df=dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet',engine='parrow'(时,我得到以下错误
RuntimeError Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
290
291 if isinstance(engine, str):
--> 292 engine = get_engine(engine)
293
294 if hasattr(path, "name"):
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
917
918 if pa_version < parse_version("0.13.1"):
--> 919 raise RuntimeError("PyArrow version >= 0.13.1 required")
920
921 if engine == "pyarrow-dataset" and pa_version.major >= 1:
RuntimeError: PyArrow version >= 0.13.1 required
我在服务器上使用的是Spark 2.4和Python 3.7。我的PyArrow版本是0.11.1。升级到其他版本会导致环境不一致。根据Dask的文档,默认引擎是auto,它选择fastparquet(我安装了它(。当我运行df=dd.read_parquet('/home/vusal.babashov/datasets/cal',engine='auto'(时,我会得到相同的OSError:[Erno 22]无效参数
--
OSError Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
Parquet文件快照
无效参数表示代码正在尝试将D_1seek()
发送到不存在的文件中的某个位置。读取镶木地板文件的元数据页脚时,最后8个字节包含页脚的大小,这就是为什么要查找(-8, 2)
(文件末尾前8个字节(。
情况很可能是这样的:spark在目录中写入了一些不是数据文件的额外文件,比如奇怪的零长度"_成功";文件和任何校验和;但没有编写CCD_ 3文件来告诉dask哪些文件将被视为数据。
这个过滤真实文件的问题,我很可能在最新的dask和fastparquet中解决了(可能还有箭头(。如果无法更新,则需要删除导致混乱的文件。