如何将 Spark 数据帧中任意数量的列从时间戳转换为长整型?



我正在用Scala编写这个,并且正在使用Spark 1.6,并且没有切换到较新版本的选项。我正在尝试合并两个数据帧,一个从Hadoop集群上的Avro文件中提取,另一个从Teradata数据库中提取。我可以很好地阅读它们,并且两者都保证以相同的顺序具有相同的列名,但是当我尝试使用

data1.unionAll(data2)

我遇到了一个错误,因为 Avro 将时间戳转换为长整型,因此两者的数据类型与这些字段不匹配。这个过程将重复几次,我知道表中总会至少有一个时间戳字段,但可能会有更多,而且我并不总是知道它们的名字,所以我正在尝试制作一个通用方法,将任意数量的列从时间戳转换为长整型。这是我到目前为止所拥有的:

def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
df.dtypes.foreach { f => 
val fName = f._1
val fType = f._2
if (fType == "TimestampType:) {
println("Found timestamp col: " + fName)
df.withColumn(fName, convert_timestamp_udf(df.col(fName)))
df.printSchema()
}
}
return df
}

通过打印输出,我可以判断该方法仅正确识别时间戳列,但 .withColumn 转换不起作用。在下一行中打印架构不会显示更新的列。此外,我还尝试为转换后的值创建一个全新的列,但它也没有添加到 df 中。谁能发现为什么这不起作用?

以下行只是transformation

df.withColumn(fName, convert_timestamp_udf(df.col(fName)))

在执行action之前,不会反映在原始dataframe上。分配将作为一个操作工作,因此您可以创建一个临时dataframe并在循环中分配给它

def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
var tempDF = df
df.schema.map(f => {
val fName = f.name
val fType = f.dataType
if (fType.toString == "TimestampType") {
println("Found timestamp col: " + fName)
tempDF = tempDF.withColumn(fName, convert_timestamp_udf(df.col(fName)))
tempDF.printSchema()
}
})
return tempDF
}

我希望答案对您有所帮助

避免使用可变var的一种方法是,您可以通过组合TimestampType列的列表并通过转换 UDF 的foldLeft遍历列表来执行类型转换:

import java.sql.Timestamp
val df = Seq(
(1, Timestamp.valueOf("2016-05-01 11:30:00"), "a", Timestamp.valueOf("2017-06-01 07:00:30")),
(2, Timestamp.valueOf("2016-06-01 12:30:00"), "b", Timestamp.valueOf("2017-07-01 08:00:30")),
(3, Timestamp.valueOf("2016-07-01 13:30:00"), "c", Timestamp.valueOf("2017-08-01 09:00:30"))
).toDF("id", "date1", "status", "date2")
val convert_timestamp_udf = udf( (time: Timestamp) => time.getTime() )
// Assemble all columns filtered with type TimestampType
val tsColumns = df.dtypes.filter(x => x._2 == "TimestampType")
// Create new dataframe by converting all Timestamps to Longs via foldLeft
val dfNew = tsColumns.foldLeft( df )(
(acc, x) => acc.withColumn(x._1, convert_timestamp_udf(df(x._1)))
)
dfNew.show
+---+-------------+------+-------------+
| id|        date1|status|        date2|
+---+-------------+------+-------------+
|  1|1462127400000|     a|1496325630000|
|  2|1464809400000|     b|1498921230000|
|  3|1467405000000|     c|1501603230000|
+---+-------------+------+-------------+
val index = ss.sparkContext.parallelize( Seq((1,"2017-5-5"),
(2,"2017-5-5"),
(3,"2017-5-5"),
(4,"2017-5-5"),
(5,"2017-5-5"))).toDF("ID", "time")
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
val newDF = index.withColumn("time", convert_timestamp_udf($"time"))
newDF.show

最新更新