Spark与异常编码的CSV文件不一致



上下文:

  • 作为数据管道的一部分,我正在处理一些平面CSV文件
  • 这些文件有异常的编码和转义规则
  • 我的意图是对它们进行预处理,并将其转换为parquet,用于后续的管道步骤

MCVE:

spark = SparkSession.builder.appName("...").getOrCreate()
min_schema = StructType(
[
StructField("dummy_col", StringType(), True),
StructField("record_id", IntegerType(), nullable=False),
StructField("dummy_after", StringType(), nullable=False),
]
)

df = (
spark.read.option("mode", "FAILFAST")
.option("quote", '"')
.option("escape", '"')
.option("inferSchema", "false")
.option("multiline", "true")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.schema(min_schema)
.csv(f'min_repro.csv', header=True)
)
dummy_col,record_id,dummy_after
"",1,", Unusual value with comma included"
B,2,"Unusual value with escaped quote and comma ""like, this"

CSV解析良好:

df.collect()
[Row(dummy_col=None, record_id=1, dummy_after=', Unusual value with comma included'),
Row(dummy_col='B', record_id=2, dummy_after='Unusual value with escaped quote and comma "like, this')]

然而,同一DF上的琐碎Spark代码失败了,并出现了模糊错误:

if df.count() != df.select('record_id').distinct().count():
pass
Py4JJavaError: An error occurred while calling o357.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
...
Caused by: java.lang.NumberFormatException: For input string: "Unusual value with comma included""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

我不明白在同一个DF上的.collect()如何提供正确的行,但在同一DF上的任何查询都失败了。

上游错误已创建:https://issues.apache.org/jira/browse/SPARK-39842

忽略的正确方法,在数据中是

  1. 将数据括在双引号内
  2. 使用选项";escapeQuotes"真">
df = ( spark.read.option("mode", "FAILFAST") .option("escapeQuotes", "true") .option("inferSchema", "false") .option("multiline", "true") .option("ignoreLeadingWhiteSpace", "true") .option("ignoreTrailingWhiteSpace", "true") .schema(min_schema) .csv(f'C:/Users/pc/Desktop/sample2.csv', header=True) )
------------------------------------------------------------------------
>>> df.select('dummy_after').show(truncate=False)
+-----------------------------------+
|dummy_after                        |
+-----------------------------------+
|, Unusual value with comma included|
+-----------------------------------+
>>> if df.count() != df.select('record_id').distinct().count():
...    pass

最新更新