我有此JSON数据,我想每小时在'TIMESTAMP'列上汇总,同时将数据汇总为'b'& amp;'A'。
{"a":1 , "b":1, "timestamp":"2017-01-26T01:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T01:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T02:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T03:14:55.719214Z"}
这是我想要的最终输出
{"a":2 , "b":2, "timestamp":"2017-01-26T01:00:00"}
{"a":1 , "b":1,"timestamp":"2017-01-26T02:00:00"}
{"a":1 , "b":1,"timestamp":"2017-01-26T03:00:00"}
这是我到目前为止写的
df = spark.read.json(inputfile)
df2 = df.groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"])
但是,在使用GroupBy函数之前,我应该如何更改"时间戳"列的值?预先感谢!
from pyspark.sql import functions as f
df = spark.read.load(path='file:///home/zht/PycharmProjects/test/disk_file', format='json')
df = df.withColumn('ts', f.to_utc_timestamp(df['timestamp'], 'EST'))
win = f.window(df['ts'], windowDuration='1 hour')
df = df.groupBy(win).agg(f.sum(df['a']).alias('sumA'), f.sum(df['b']).alias('sumB'))
res = df.select(df['window']['start'].alias('start_time'), df['sumA'], df['sumB'])
res.show(truncate=False)
# output:
+---------------------+----+----+
|start_time |sumA|sumB|
+---------------------+----+----+
|2017-01-26 15:00:00.0|1 |1 |
|2017-01-26 16:00:00.0|1 |1 |
|2017-01-26 14:00:00.0|2 |2 |
+---------------------+----+----+
F.Window 更灵活
我想这是做这件事的一种方法
df2 = df.withColumn("r_timestamp",df["r_timestamp"].substr(0,12)).groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"])
是否有更好的解决方案以所需格式获得时间戳?