所以我遇到了一个问题,在写入分区Parquet文件时,DataFrame中的某些行会被丢弃。
以下是我的步骤:
- 使用指定的模式从S3读取CSV数据文件
- 按"日期"列分区(DateType)
- 用
mode=append
写为Parquet
读取的第一步按预期进行,没有解析问题。为了进行质量检查,我会进行以下操作:
对于date='2012-11-22'
的特定分区,对CSV文件、加载的DataFrame和镶木地板文件执行计数。
以下是一些使用pyspark:复制的代码
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
我也试过写HDFS,结果是一样的。这种情况发生在多个分区上。默认/空分区中没有记录。上述logs_df
是准确和正确的。
我尝试的第二个实验是编写一个未分区的镶木地板文件。上述代码的唯一区别是省略了partitionBy()
:
logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')
加载这套镶木地板并按上述方式进行计数,得出date='2012-11-22'
和其他日期的正确结果为5000。将模式设置为overwrite
或不设置(使用默认值)会导致相同的数据丢失。
我的环境是:
- EMR 5.9.0
- Spark 2.2.0
- Hadoop分发:Amazon 2.7.3
- 已尝试使用两个EMRFS一致视图,但未尝试。然而,大多数测试都是通过编写HDFS来避免任何S3一致性问题
我非常感谢使用Spark进行修复或变通,或以其他方式转换为镶木地板文件。
谢谢,
编辑:我无法重现第二个实验。因此,假设分区和未分区在写入Parquet或JSON时似乎都会丢弃记录。
所以神秘之处肯定在模式定义中。然而,出乎意料的是,它不是日期或时间戳。它实际上是布尔值。
我已经从Redshift导出了CSV,它将布尔写成t
和f
。当我检查推断的模式时,这些字段被标记为字符串类型。在CSV文件中使用true
和false
进行的一个简单测试将它们识别为布尔值。
所以我原以为日期和时间戳解析会像往常一样出错,但结果是布尔值。吸取的教训。
谢谢你的指点。