我正在使用PySpark。我在数据帧('canon_vt')中有一列('dt'),这是一个时间戳。我正在尝试从DateTime值中删除秒。它最初是作为字符串从镶木地板中读入的。然后我尝试通过将其转换为时间戳
canon_evt = canon_evt.withColumn('dt',to_date(canon_evt.dt))
canon_evt= canon_evt.withColumn('dt',canon_evt.dt.astype('Timestamp'))
然后我想删除秒。我尝试过"trunk"、"date_format",甚至尝试过像下面这样将片段连接在一起。我认为它需要某种映射和lambda的组合,但我不确定Timestamp是否是一种合适的格式,也不确定是否可以去掉秒。
canon_evt = canon_evt.withColumn('dyt',year('dt') + '-' + month('dt') +
'-' + dayofmonth('dt') + ' ' + hour('dt') + ':' + minute('dt'))
[Row(dt=datetime.datetime(2015, 9, 16, 0, 0),dyt=None)]
Spark>=2.3
您可以使用date_trunc
df.withColumn("dt_truncated", date_trunc("minute", col("dt"))).show()
## +-------------------+-------------------+
## | dt| dt_truncated|
## +-------------------+-------------------+
## |1970-01-01 00:00:00|1970-01-01 00:00:00|
## |2015-09-16 05:39:46|2015-09-16 05:39:00|
## |2015-09-16 05:40:46|2015-09-16 05:40:00|
## |2016-03-05 02:00:10|2016-03-05 02:00:00|
## +-------------------+-------------------+
火花<2.3
转换为Unix时间戳和基本算法应该很有用:
from pyspark.sql import Row
from pyspark.sql.functions import col, unix_timestamp, round
df = sc.parallelize([
Row(dt='1970-01-01 00:00:00'),
Row(dt='2015-09-16 05:39:46'),
Row(dt='2015-09-16 05:40:46'),
Row(dt='2016-03-05 02:00:10'),
]).toDF()
## unix_timestamp converts string to Unix timestamp (bigint / long)
## in seconds. Divide by 60, round, multiply by 60 and cast
## should work just fine.
##
dt_truncated = ((round(unix_timestamp(col("dt")) / 60) * 60)
.cast("timestamp"))
df.withColumn("dt_truncated", dt_truncated).show(10, False)
## +-------------------+---------------------+
## |dt |dt_truncated |
## +-------------------+---------------------+
## |1970-01-01 00:00:00|1970-01-01 00:00:00.0|
## |2015-09-16 05:39:46|2015-09-16 05:40:00.0|
## |2015-09-16 05:40:46|2015-09-16 05:41:00.0|
## |2016-03-05 02:00:10|2016-03-05 02:00:00.0|
## +-------------------+---------------------+
这个问题是几年前提出的,但如果其他人遇到它,从Spark v2.3开始,它已经被添加为一个功能。现在,这就像(假设canon_evt
是一个具有时间戳列dt
的数据帧,我们希望从中删除秒)一样简单
from pyspark.sql.functions import date_trunc
canon_evt = canon_evt.withColumn('dt', date_trunc('minute', canon_evt.dt))
我认为zero323是最好的答案。Spark在本机上不支持这一点有点烦人,因为它很容易实现。对于子孙后代,这里有一个我使用的函数:
def trunc(date, format):
"""Wraps spark's trunc fuction to support day, minute, and hour"""
import re
import pyspark.sql.functions as func
# Ghetto hack to get the column name from Column object or string:
try:
colname = re.match(r"Column<.?'(.*)'>", str(date)).groups()[0]
except AttributeError:
colname = date
alias = "trunc(%s, %s)" % (colname, format)
if format in ('year', 'YYYY', 'yy', 'month', 'mon', 'mm'):
return func.trunc(date, format).alias(alias)
elif format in ('day', 'DD'):
return func.date_sub(date, 0).alias(alias)
elif format in ('min', ):
return ((func.round(func.unix_timestamp(date) / 60) * 60).cast("timestamp")).alias(alias)
elif format in ('hour', ):
return ((func.round(func.unix_timestamp(date) / 3600) * 3600).cast("timestamp")).alias(alias)
将时间戳截断为其他分钟,例如5分钟或10分钟或7分钟
from pyspark.sql.functions import *
df = spark.createDataFrame([("2016-03-11 09:00:07", 1, 5),("2016-03-11 09:00:57", 2, 5)]).toDF("date", "val1","val2")
w = df.groupBy('val',window("date", "5 seconds")).agg(sum("val1").alias("sum"))
w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum", "val").show(10, False)