划分不同DataFrames的两列



我正在使用Spark对用户日志文件进行探索性数据分析。我正在做的分析之一是每个主机每天的平均请求数。所以为了计算平均值,我需要将DataFrame的总请求列除以DataFrame的唯一请求列。

total_req_per_day_df = logs_df.select('host',dayofmonth('time').alias('day')).groupby('day').count()
avg_daily_req_per_host_df = total_req_per_day_df.select("day",(total_req_per_day_df["count"] / daily_hosts_df["count"]).alias("count"))

这是我使用PySpark来确定平均值所写的内容。这是我得到的错误日志

AnalysisException: u'resolved attribute(s) count#1993L missing from day#3628,count#3629L in operator !Project [day#3628,(cast(count#3629L as double) / cast(count#1993L as double)) AS count#3630];

注意:daily_hosts_df和logs_df缓存在内存中。如何划分两个数据帧的计数列?

不可能引用其他表中的列。如果你想组合数据,你必须首先使用类似的东西join

from pyspark.sql.functions import col
(total_req_per_day_df.alias("total")
    .join(daily_hosts_df.alias("host"), ["day"])
    .select(col("day"), (col("total.count") / col("host.count")).alias("count")))

这是一个来自edX Spark课程作业的问题。由于该解决方案现在已经公开,我借此机会分享了另一个较慢的解决方案,并询问它的性能是否可以改进,或者完全是反Spark的?

daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30))
days_with_hosts, hosts = zip(*daily_hosts_list)
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30))
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float) / np.array(hosts)))]
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day'))

在列日连接两个数据帧,然后选择计数列的日期和比率。

total_req_per_day_df = logs_df.select(dayofmonth('time')
                                      .alias('day')
                                     ).groupBy('day').count()
avg_daily_req_per_host_df = (
  total_req_per_day_df.join(daily_hosts_df, 
                            total_req_per_day_df.day == daily_hosts_df.day
                           )
  .select(daily_hosts_df['day'], 
          (total_req_per_day_df['count']/daily_hosts_df['count'])
           .alias('avg_reqs_per_host_per_day')
          )
  .cache()
)

解决方案,基于zero323答案,但正确地用作OUTER联接。

avg_daily_req_per_host_df = (
  total_req_per_day_df.join(
      daily_hosts_df, daily_hosts_df['day'] == total_req_per_day_df['day'], 'outer'
  ).select(
      total_req_per_day_df['day'], 
      (total_req_per_day_df['count']/daily_hosts_df['count']).alias('avg_reqs_per_host_per_day')
  )
).cache()

如果没有"外部"参数,您将在其中一个数据帧中丢失天的数据。这对于PySpark Lab2任务来说并不重要,因为两个数据帧都包含相同的日期。但可能会在其他任务中造成一些痛苦:(

相关内容

  • 没有找到相关文章

最新更新