我需要在pyspark中创建相当于bussiness的当前视图,我有一个历史记录文件和一个delta文件(包含ID和日期)。我需要创建最终数据帧,该数据帧将具有该文件。每个ID和该记录的单记录应为最新日期。
df1=sql_context.createDataFrame([("3000", "2017-04-19"), ("5000", "2017-04-19"), ("9012", "2017-04-19")], ["id", "date"])
df2=sql_context.createDataFrame([("3000", "2017-04-18"), ("5120", "2017-04-18"), ("1012", "2017-04-18")], ["id", "date"])
df3 = df2.union(df1).distinct()
+----+----------+
| id| date|
+----+----------+
|3000|2017-04-19|
|3000|2017-04-18|
|5120|2017-04-18|
|5000|2017-04-19|
|1012|2017-04-18|
|9012|2017-04-19|
---- ------------
我尝试进行联盟并进行独特的工作,它给了我ID = 3000,因为我只需要ID = 300的日期= 2017-04-19
的记录= 3000甚至减去也无法工作,因为它返回了DF的所有行。
所需的输出: -
+----+----------+
| id| date|
+----+----------+
|3000|2017-04-19|
|
|5120|2017-04-18|
|5000|2017-04-19|
|1012|2017-04-18|
|9012|2017-04-19|
+----+----------+
希望这会有所帮助!
from pyspark.sql.functions import unix_timestamp, col, to_date, max
#sample data
df1=sqlContext.createDataFrame([("3000", "2017-04-19"),
("5000", "2017-04-19"),
("9012", "2017-04-19")],
["id", "date"])
df2=sqlContext.createDataFrame([("3000", "2017-04-18"),
("5120", "2017-04-18"),
("1012", "2017-04-18")],
["id", "date"])
df=df2.union(df1)
df.show()
#convert 'date' column to date type so that latest date can be fetched for an ID
df = df.
withColumn('date_inDateFormat',to_date(unix_timestamp(col('date'),"yyyy-MM-dd").cast("timestamp"))).
drop('date')
#get latest date for an ID
df = df.groupBy('id').agg(max('date_inDateFormat').alias('date'))
df.show()
输出是:
+----+----------+
| id| date|
+----+----------+
|5000|2017-04-19|
|1012|2017-04-18|
|5120|2017-04-18|
|9012|2017-04-19|
|3000|2017-04-19|
+----+----------+
注意:请不要忘记让答案是否可以帮助您解决问题。