SparkSQL Groupby创建嵌套记录



我有该表格的信息(显然是假的,但有目的):

| User | Country |
|------|---------|
| A    | Sweden  |
| A    | Sweden  |
| A    | London  |
| B    | Spain   |
| B    | Denmark |
| B    | Brazil  |
| C    | India   |

这可以作为Spark中的数据框提供。我一直在想使用Spark(也许是SparkSQL)来为每个用户计算频率图。

(A => Map((Sweden, 2), (London, 1)))
(B => Map((Spain, 1), (Brazil, 1), (Denmark, 1)))
(C => Map((India, 1)))

到目前为止,我已经到了:

(A => (Sweden, 2))
(A => (London, 1))
(B => (Spain, 1))
(B => (Brazil, 1))
(B => (Denmark, 1))
(C => (India, 1))

使用以下查询:

SELECT user, country, COUNT(country) as frequency
FROM information
GROUP BY user, country

但是问题是我最终得到了6行而不是3。不确定从这里去哪里。

您可以使用struct(Country, Frequency)应用另一个groupBy/agg使用collect_list,如下所示:

import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
  ("A", "Sweden"), ("A", "Sweden"), ("A", "London"),
  ("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"),
  ("C", "India")
).toDF("User", "Country")
df.
  groupBy("User", "Country").agg(count("Country").as("Frequency")).
  groupBy("User").agg(collect_list(struct("Country", "Frequency")).as("Country_Counts")).
  show(false)
// +----+------------------------------------+
// |User|Country_Counts                      |
// +----+------------------------------------+
// |B   |[[Denmark,1], [Brazil,1], [Spain,1]]|
// |C   |[[India,1]]                         |
// |A   |[[London,1], [Sweden,2]]            |
// +----+------------------------------------+

请注意,第一个groupBy/agg转换等效于您的SQL查询。

之后,您需要按用户进行分组并收集国家和频率的地图。以下代码应该有用。

//Creating Test Data
val df = Seq(("A", "Sweden"), ("A", "Sweden"), ("A", "London"), ("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"), ("C", "India"))
  .toDF("user", "country")
df.show(false)
+----+-------+
|user|country|
+----+-------+
|A   |Sweden |
|A   |Sweden |
|A   |London |
|B   |Spain  |
|B   |Denmark|
|B   |Brazil |
|C   |India  |
+----+-------+
df.registerTempTable("information")
val joinMap = spark.udf.register( "joinMap" , (values: Seq[Map[String,Long]]) => values.flatten.toMap )
val resultDF = spark.sql("""SELECT user, joinMap(collect_list(map(country, frequency))) as frequencyMap
                           |From ( SELECT user, country, COUNT(country) as frequency
                           |FROM information
                           |GROUP BY user, country ) A
                           |GROUP BY user""".stripMargin)
resultDF.show(false)
+----+------------------------------------------+
|user|frequencyMap                              |
+----+------------------------------------------+
|A   |Map(Sweden -> 2, London -> 1)             | 
|B   |Map(Spain -> 1, Denmark -> 1, Brazil -> 1)|
|C   |Map(India -> 1)                           |
+----+------------------------------------------+

如果您希望最终结果作为映射,请使用UDF。没有UDF,您将获得它作为地图列表。

相关内容

  • 没有找到相关文章

最新更新