var 列名= "callStart_t,callend_t"//时间戳列名是动态输入。
scala> df1.show()
+------+------------+--------+----------+
| name| callStart_t|personid| callend_t|
+------+------------+--------+----------+
| Bindu|1080602418 | 2|1080602419|
|Raphel|1647964576 | 5|1647964576|
| Ram|1754536698 | 9|1754536699|
+------+------------+--------+----------+
我尝试过的代码:
val newDf = df1.withColumn("callStart_Time", to_utc_timestamp(from_unixtime($"callStart_t"/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))
val newDf = df1.withColumn("callend_Time", to_utc_timestamp(from_unixtime($"callend_t"/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))
在这里,我不希望新列转换(from_unixtime转换为to_utc_timestamp),我想转换现有列本身
示例输出
+------+---------------------+--------+--------------------+
| name| callStart_t |personid| callend_t |
+------+---------------------+--------+--------------------+
| Bindu|1970-01-13 04:40:02 | 2|1970-01-13 04:40:02 |
|Raphel|1970-01-20 06:16:04 | 5|1970-01-20 06:16:04 |
| Ram|1970-01-21 11:52:16 | 9|1970-01-21 11:52:16 |
+------+---------------------+--------+--------------------+
注意:时间戳列名称是动态的。
如何动态获取每一列?
只需对列使用相同的名称,它就会替换它:
val newDf = df1.withColumn("callStart_t", to_utc_timestamp(from_unixtime($"callStart_t"/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))
val newDf = df1.withColumn("callend_t", to_utc_timestamp(from_unixtime($"callend_t"/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))
要使其动态化,只需使用相关字符串。例如:
val colName = "callend_t"
val newDf = df.withColumn(colName , to_utc_timestamp(from_unixtime(col(colName)/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))
对于多列,您可以执行以下操作:
val columns=Seq("callend_t", "callStart_t")
val newDf = columns.foldLeft(df1){ case (curDf, colName) => curDf.withColumn(colName , to_utc_timestamp(from_unixtime(col(colName)/1000,"yyyy-MM-dd hh:mm:ss"),"Europe/Berlin"))}
注意:如评论中所述,不需要除以 1000。