我有一个pyspark df,它有一个小时列(int(,如下所示:
小时 |
---|
0 |
0 |
1 |
14 |
如果使用spark 3+,则可以使用make_timestamp
函数,否则可以使用UDF。
spark.range(15).
withColumnRenamed('id', 'hour').
withColumn('static_dttm', func.lit(execution_datetime).cast('timestamp')).
withColumn('dttm',
func.expr('''make_timestamp(year(static_dttm),
month(static_dttm),
day(static_dttm),
hour,
minute(static_dttm),
second(static_dttm)
)
''')
).
drop('static_dttm').
show()
# +----+-------------------+
# |hour| dttm|
# +----+-------------------+
# | 0|2022-01-02 00:23:11|
# | 1|2022-01-02 01:23:11|
# | 2|2022-01-02 02:23:11|
# | 3|2022-01-02 03:23:11|
# | 4|2022-01-02 04:23:11|
# | 5|2022-01-02 05:23:11|
# | 6|2022-01-02 06:23:11|
# | 7|2022-01-02 07:23:11|
# | 8|2022-01-02 08:23:11|
# | 9|2022-01-02 09:23:11|
# | 10|2022-01-02 10:23:11|
# | 11|2022-01-02 11:23:11|
# | 12|2022-01-02 12:23:11|
# | 13|2022-01-02 13:23:11|
# | 14|2022-01-02 14:23:11|
# +----+-------------------+
使用UDF
def update_ts(string_ts, hour_col):
import datetime
dttm = datetime.datetime.strptime(string_ts, '%Y-%m-%d %H:%M:%S')
return datetime.datetime(dttm.year, dttm.month, dttm.day, hour_col, dttm.minute, dttm.second)
update_ts_udf = func.udf(update_ts, TimestampType())
spark.range(15).
withColumnRenamed('id', 'hour').
withColumn('dttm', update_ts_udf(func.lit(execution_datetime), func.col('hour'))).
show()
# +----+-------------------+
# |hour| dttm|
# +----+-------------------+
# | 0|2022-01-02 00:23:11|
# | 1|2022-01-02 01:23:11|
# | 2|2022-01-02 02:23:11|
# | 3|2022-01-02 03:23:11|
# | 4|2022-01-02 04:23:11|
# | 5|2022-01-02 05:23:11|
# | 6|2022-01-02 06:23:11|
# | 7|2022-01-02 07:23:11|
# | 8|2022-01-02 08:23:11|
# | 9|2022-01-02 09:23:11|
# | 10|2022-01-02 10:23:11|
# | 11|2022-01-02 11:23:11|
# | 12|2022-01-02 12:23:11|
# | 13|2022-01-02 13:23:11|
# | 14|2022-01-02 14:23:11|
# +----+-------------------+
您可以使用concat
连接一行中的多个字符串,使用lit
为每行添加一个常数值。
在下面的代码中,引入了一个新的列timestamp
,其中execution_datetime
的前11个字符由小时后和小时之间的字符组成。它还确保了时间有一个前导零。
import pyspark.sql.functions as f
df = df.withColumn('timestamp', f.concat(f.lit(execution_datetime [0:11]), f.lpad(f.col('hour'), 2, '0') , f.lit(f.lit(execution_datetime [13:]))))
备注:这可能是比samkart答案中建议的使用时间戳函数更快的版本,但在捕捉错误输入时也不太安全