我正在使用Spark对用户日志文件进行探索性数据分析。我正在做的分析之一是每个主机每天的平均请求数。所以为了计算平均值,我需要将DataFrame的总请求列除以DataFrame的唯一请求列。
total_req_per_day_df = logs_df.select('host',dayofmonth('time').alias('day')).groupby('day').count()
avg_daily_req_per_host_df = total_req_per_day_df.select("day",(total_req_per_day_df["count"] / daily_hosts_df["count"]).alias("count"))
这是我使用PySpark来确定平均值所写的内容。这是我得到的错误日志
AnalysisException: u'resolved attribute(s) count#1993L missing from day#3628,count#3629L in operator !Project [day#3628,(cast(count#3629L as double) / cast(count#1993L as double)) AS count#3630];
注意:daily_hosts_df和logs_df缓存在内存中。如何划分两个数据帧的计数列?
不可能引用其他表中的列。如果你想组合数据,你必须首先使用类似的东西join
:
from pyspark.sql.functions import col
(total_req_per_day_df.alias("total")
.join(daily_hosts_df.alias("host"), ["day"])
.select(col("day"), (col("total.count") / col("host.count")).alias("count")))
这是一个来自edX Spark课程作业的问题。由于该解决方案现在已经公开,我借此机会分享了另一个较慢的解决方案,并询问它的性能是否可以改进,或者完全是反Spark的?
daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30))
days_with_hosts, hosts = zip(*daily_hosts_list)
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30))
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float) / np.array(hosts)))]
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day'))
在列日连接两个数据帧,然后选择计数列的日期和比率。
total_req_per_day_df = logs_df.select(dayofmonth('time')
.alias('day')
).groupBy('day').count()
avg_daily_req_per_host_df = (
total_req_per_day_df.join(daily_hosts_df,
total_req_per_day_df.day == daily_hosts_df.day
)
.select(daily_hosts_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count'])
.alias('avg_reqs_per_host_per_day')
)
.cache()
)
解决方案,基于zero323答案,但正确地用作OUTER联接。
avg_daily_req_per_host_df = (
total_req_per_day_df.join(
daily_hosts_df, daily_hosts_df['day'] == total_req_per_day_df['day'], 'outer'
).select(
total_req_per_day_df['day'],
(total_req_per_day_df['count']/daily_hosts_df['count']).alias('avg_reqs_per_host_per_day')
)
).cache()
如果没有"外部"参数,您将在其中一个数据帧中丢失天的数据。这对于PySpark Lab2任务来说并不重要,因为两个数据帧都包含相同的日期。但可能会在其他任务中造成一些痛苦:(