将DF中的datetime列替换为int列中的hour



我有一个pyspark df,它有一个小时列(int(,如下所示:

小时
0
0
1
14

如果使用spark 3+,则可以使用make_timestamp函数,否则可以使用UDF。

spark.range(15). 
withColumnRenamed('id', 'hour'). 
withColumn('static_dttm', func.lit(execution_datetime).cast('timestamp')). 
withColumn('dttm',
func.expr('''make_timestamp(year(static_dttm), 
month(static_dttm), 
day(static_dttm), 
hour, 
minute(static_dttm), 
second(static_dttm)
)
''')
). 
drop('static_dttm'). 
show()
# +----+-------------------+
# |hour|               dttm|
# +----+-------------------+
# |   0|2022-01-02 00:23:11|
# |   1|2022-01-02 01:23:11|
# |   2|2022-01-02 02:23:11|
# |   3|2022-01-02 03:23:11|
# |   4|2022-01-02 04:23:11|
# |   5|2022-01-02 05:23:11|
# |   6|2022-01-02 06:23:11|
# |   7|2022-01-02 07:23:11|
# |   8|2022-01-02 08:23:11|
# |   9|2022-01-02 09:23:11|
# |  10|2022-01-02 10:23:11|
# |  11|2022-01-02 11:23:11|
# |  12|2022-01-02 12:23:11|
# |  13|2022-01-02 13:23:11|
# |  14|2022-01-02 14:23:11|
# +----+-------------------+

使用UDF

def update_ts(string_ts, hour_col):
import datetime
dttm = datetime.datetime.strptime(string_ts, '%Y-%m-%d %H:%M:%S')
return datetime.datetime(dttm.year, dttm.month, dttm.day, hour_col, dttm.minute, dttm.second)
update_ts_udf = func.udf(update_ts, TimestampType())
spark.range(15). 
withColumnRenamed('id', 'hour'). 
withColumn('dttm', update_ts_udf(func.lit(execution_datetime), func.col('hour'))). 
show()
# +----+-------------------+
# |hour|               dttm|
# +----+-------------------+
# |   0|2022-01-02 00:23:11|
# |   1|2022-01-02 01:23:11|
# |   2|2022-01-02 02:23:11|
# |   3|2022-01-02 03:23:11|
# |   4|2022-01-02 04:23:11|
# |   5|2022-01-02 05:23:11|
# |   6|2022-01-02 06:23:11|
# |   7|2022-01-02 07:23:11|
# |   8|2022-01-02 08:23:11|
# |   9|2022-01-02 09:23:11|
# |  10|2022-01-02 10:23:11|
# |  11|2022-01-02 11:23:11|
# |  12|2022-01-02 12:23:11|
# |  13|2022-01-02 13:23:11|
# |  14|2022-01-02 14:23:11|
# +----+-------------------+

您可以使用concat连接一行中的多个字符串,使用lit为每行添加一个常数值。

在下面的代码中,引入了一个新的列timestamp,其中execution_datetime的前11个字符由小时后和小时之间的字符组成。它还确保了时间有一个前导零。

import pyspark.sql.functions as f
df = df.withColumn('timestamp', f.concat(f.lit(execution_datetime [0:11]), f.lpad(f.col('hour'), 2, '0') , f.lit(f.lit(execution_datetime [13:]))))

备注:这可能是比samkart答案中建议的使用时间戳函数更快的版本,但在捕捉错误输入时也不太安全