我正在使用pySpark来读取和计算数据帧的统计信息。
数据帧如下所示:
TRANSACTION_URL START_TIME END_TIME SIZE FLAG COL6 COL7 ...
www.google.com 20170113093210 20170113093210 150 1 ... ...
www.cnet.com 20170113114510 20170113093210 150 2 ... ...
我正在向数据帧添加新的timePeriod
列,添加后,我想保存前 50K 条记录,timePeriod
与某个预定义的值匹配。
我的目的是将这些行保存到带有数据帧标题的 CSV 中。
我知道这应该是col
和write.csv
的结合,但我不确定如何正确地使用这些来实现我的意图。
我当前的代码是:
encodeUDF = udf(encode_time, StringType())
log_df = log_df.withColumn('timePeriod', encodeUDF(col('START_TIME')))
添加列后,我猜我应该使用类似的东西:
log_df.select(col('timePeriod') == 'Weekday').write.csv(....)
有人可以帮我填补这里的空白,以符合我的意图吗?
unix_timestamp
和date_format
在这里是有用的方法START_TIME
因为不是时间戳类型。
dfWithDayNum = log_df.withColumn("timePeriod", date_format(
unix_timestamp(col("START_TIME"), "yyyyMMddHHmmss").cast(TimestampType), "u")
)
timePeriod
将有一周的天数(1 = 星期一,...,7 = 星期日(
dfWithDayNum
.filter(col("timePeriod") < 6) //to filter weekday
.limit(50000) //X lines
.write.format("csv")
.option("header", "true")
.csv("location/to/save/df")
使用以下filter()
和limit()
方法按以下方式求解:
new_log_df.filter(col('timePeriod') == '20161206, Morning').limit(50).write.
format('csv').option("header", "true").save("..Path..")