Spark Filter DataFrame返回空结果



我正在一个项目中工作,其中包含Scala和Spark处理文件,这些文件存储在HDFS中。这些文件每天早上降落在HDFS。我的工作每天都从HDFS读取该文件,对其进行处理,然后在HDFS中写入结果。将文件转换为数据框后,该作业执行过滤器,以获取包含时间戳高于最后一个文件中处理的最高时间戳的行。该过滤器仅几天才有未知的行为。有些日子可以按预期工作,而其他日子有新文件包含匹配该过滤器的行,但过滤器结果是空的。当它在测试环境中执行时,在我的本地作品中,使用具有相同HDFS连接的同一文件时,这种情况一直发生在同一文件中。

我试图以不同的方式过滤,但是在该环境中没有任何特定文件的工作,但是所有这些都可以在我的本地工作:1)Spark SQL

val diff = fp.spark.sql("select * from curr " +
s"where TO_DATE(CAST(UNIX_TIMESTAMP(substring(${updtDtCol}, 
${substrStart},${substrEnd}),'${dateFormat}') as TIMESTAMP))" +
s" > TO_DATE(CAST(UNIX_TIMESTAMP('${prevDate.substring(0,10)}' 
,'${dateFormat}') as TIMESTAMP))")

2)火花滤波器功能

val diff = df.filter(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)))

3)与过滤器的结果添加额外的列,然后通过此新列

过滤
val test2 = df.withColumn("PrevDate", lit(prevDate.substring(0,10)))
      .withColumn("DatePre", date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat))
      .withColumn("Result", date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)))
      .withColumn("x", when(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)), lit(1)).otherwise(lit(0)))
val diff = test2.filter("x == 1")

我认为问题不是由过滤器本身或可能是由文件引起的,但我想收到有关我应该检查什么或是否有人面对此问题的反馈。

请让我知道哪些信息可以在此处发布以收到一些反馈。

文件示例的一部分如下:

|TIMESTAMP                 |Result|x|
|2017-11-30-06.46.41.288395|true  |1|
|2017-11-28-08.29.36.188395|false |0|

将时间戳值与上一个日期进行比较(例如:2017-11-29),我创建了一个称为"结果"的列,结果始终在环境中起作用,还有另一列称为" x"结果相同。

正如我之前提到的,如果我在列或列中使用"结果"或" x"中的结果函数来过滤数据帧,则有时结果是一个空数据框,结果包含数据。

我怀疑这是数据/日期格式问题。您是否有机会验证转换的日期是否如预期?

如果两个列的日期字符串都包含时区,则该行为是可以预测的。

如果其中一个只有一个时区,则在本地和远程执行时结果将有所不同。这完全取决于集群的时区。

为了调试问题,我建议您有其他列来捕获相应日期字符串的unix_timestamp(..)/millis,以及具有和附加列以捕获两列的差异。DIFF列应有助于找出转换在哪里以及为什么出错。希望这会有所帮助。

如果任何人都想知道此问题发生了什么,以及我最终如何找到错误的原因就是解释。基本上,这是由执行作业的机器的不同时区(本地机器和测试服务器)引起的。UNIX_TIMESTAMP函数返回了正确的值,请记住服务器的时区。基本上,在最后我不必使用unix_timestamp函数,也不需要使用日期字段的完整内容。下次,我将在此之前仔细检查。

最新更新