如何在Scala Spark中的Epoch Timestame字段进行分组



我想按日期按记录进行分组。但是日期是在毫秒的时期时间戳。这是示例数据。

date,   Col1
1506838074000,  a
1506868446000,  b
1506868534000,  c
1506869064000,  a
1506869211000,  c
1506871846000,  f
1506874462000,  g
1506879651000,  a

这是我要实现的目标。

**date  Count of records**
02-10-2017  4
04-10-2017  3
03-10-2017  5

这是我尝试通过的代码,

import java.text.SimpleDateFormat
val dateformat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val df = sqlContext.read.csv("<path>")
val result = df.select("*").groupBy(dateformat.format($"date".toLong)).agg(count("*").alias("cnt")).select("date","cnt")

但是在执行代码时,我要低于异常。

   <console>:30: error: value toLong is not a member of org.apache.spark.sql.ColumnName
         val t = df.select("*").groupBy(dateformat.format($"date".toLong)).agg(count("*").alias("cnt")).select("date","cnt")

请帮助我解决问题。

您需要更改似乎在long中的 date 列,为date数据类型。这可以通过使用from_unixtime内置功能来完成。然后它只是groupByagg函数调用并使用count函数。

import org.apache.spark.sql.functions._
def stringDate = udf((date: Long) => new java.text.SimpleDateFormat("dd-MM-yyyy").format(date))
df.withColumn("date", stringDate($"date"))
    .groupBy("date")
    .agg(count("Col1").as("Count of records"))
    .show(false) 

上面的答案是使用UDF函数,应尽可能避免使用UDF,因为UDF是一个黑匣子,需要序列化和列表。

更新

感谢@philantrovert的建议除以1000

import org.apache.spark.sql.functions._
df.withColumn("date", from_unixtime($"date"/1000, "yyyy-MM-dd"))
    .groupBy("date")
    .agg(count("Col1").as("Count of records"))
    .show(false)

双向工作。

最新更新