在pyspark中读取未分区csv文件时跳过特定行



我有一个未分区的gzip csv文件,我正在读到spark。读入压缩后的文件不是问题,但是一旦使用操作对spark数据框进行评估,触及一个特定的违规行,就会抛出错误。如果我使用df.limit(),我可以将读取时的数据帧子集设置为违规观察之前行的行号,并且可以随后继续我的工作流程而不会出错。

我的问题是,有没有办法在观察中跳过阅读?我想沿着df.limit_range(100:200)的行做一些事情,其中在读取csv时跳过100-200行。我尝试了各种方法来生成索引列,然后进行过滤,但是我在评估时遇到了问题。下面我尝试在问题行之前的所有行进行子集,然后与原始未过滤的数据框反连接,但是一旦评估了问题行,就会再次导致错误,表明无法读取压缩后的文件。

df_full = df.withColumn("rowId", monotonically_increasing_id())
df_head = df_full.limit(100).where(col("rowID") < 99)
anti_df = df_full.join(df_head, "id", "left_anti")

错误:

FileReadException: Error while reading file s3a://some-s3-bucket/dir/subdir/file_name.gz.
Caused by: EOFException: Unexpected end of input stream

您可以在列上使用过滤器读取除100-200行以外的所有行。

from pyspark.sql import functions as f
df_full = df.withColumn("rowId", f.monotonically_increasing_id())
anti_df = df_full.filter("rowId <= 100 or rowId >= 200")

anti_df的输出将是:

+----+-----+
|   z|rowId|
+----+-----+
:
:
|3.38|   95|
| 3.4|   96|
|4.07|   97|
|3.56|   98|
|3.66|   99|
|3.65|  100|
|3.43|  200|
|3.49|  201|
|3.48|  202|
| 3.6|  203|
|4.08|  204|
|3.63|  205|
:
:

确保你的过滤器在你的火花计划中被推下。我的意思是,过滤器应该在读取之后立即执行,而不是在对其执行多次计算之后执行(此时您的代码可能会因为令人讨厌的错误而失败)。

最新更新