AWS EMR上的Spark 2.2.0写入Parquet会导致行数下降



所以我遇到了一个问题,在写入分区Parquet文件时,DataFrame中的某些行会被丢弃。

以下是我的步骤:

  1. 使用指定的模式从S3读取CSV数据文件
  2. 按"日期"列分区(DateType)
  3. mode=append写为Parquet

读取的第一步按预期进行,没有解析问题。为了进行质量检查,我会进行以下操作:

对于date='2012-11-22'的特定分区,对CSV文件、加载的DataFrame和镶木地板文件执行计数。

以下是一些使用pyspark:复制的代码

logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted

我也试过写HDFS,结果是一样的。这种情况发生在多个分区上。默认/空分区中没有记录。上述logs_df是准确和正确的。

我尝试的第二个实验是编写一个未分区的镶木地板文件。上述代码的唯一区别是省略了partitionBy():

logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')

加载这套镶木地板并按上述方式进行计数,得出date='2012-11-22'和其他日期的正确结果为5000。将模式设置为overwrite或不设置(使用默认值)会导致相同的数据丢失。

我的环境是:

  • EMR 5.9.0
  • Spark 2.2.0
  • Hadoop分发:Amazon 2.7.3
  • 已尝试使用两个EMRFS一致视图,但未尝试。然而,大多数测试都是通过编写HDFS来避免任何S3一致性问题

我非常感谢使用Spark进行修复或变通,或以其他方式转换为镶木地板文件。

谢谢,

编辑:我无法重现第二个实验。因此,假设分区和未分区在写入Parquet或JSON时似乎都会丢弃记录。

所以神秘之处肯定在模式定义中。然而,出乎意料的是,它不是日期或时间戳。它实际上是布尔值。

我已经从Redshift导出了CSV,它将布尔写成tf。当我检查推断的模式时,这些字段被标记为字符串类型。在CSV文件中使用truefalse进行的一个简单测试将它们识别为布尔值。

所以我原以为日期和时间戳解析会像往常一样出错,但结果是布尔值。吸取的教训。

谢谢你的指点。

相关内容

  • 没有找到相关文章

最新更新