我有该表格的信息(显然是假的,但有目的):
| User | Country |
|------|---------|
| A | Sweden |
| A | Sweden |
| A | London |
| B | Spain |
| B | Denmark |
| B | Brazil |
| C | India |
这可以作为Spark中的数据框提供。我一直在想使用Spark(也许是SparkSQL)来为每个用户计算频率图。
(A => Map((Sweden, 2), (London, 1)))
(B => Map((Spain, 1), (Brazil, 1), (Denmark, 1)))
(C => Map((India, 1)))
到目前为止,我已经到了:
(A => (Sweden, 2))
(A => (London, 1))
(B => (Spain, 1))
(B => (Brazil, 1))
(B => (Denmark, 1))
(C => (India, 1))
使用以下查询:
SELECT user, country, COUNT(country) as frequency
FROM information
GROUP BY user, country
但是问题是我最终得到了6行而不是3。不确定从这里去哪里。
您可以使用struct(Country, Frequency)
应用另一个groupBy/agg
使用collect_list
,如下所示:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("A", "Sweden"), ("A", "Sweden"), ("A", "London"),
("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"),
("C", "India")
).toDF("User", "Country")
df.
groupBy("User", "Country").agg(count("Country").as("Frequency")).
groupBy("User").agg(collect_list(struct("Country", "Frequency")).as("Country_Counts")).
show(false)
// +----+------------------------------------+
// |User|Country_Counts |
// +----+------------------------------------+
// |B |[[Denmark,1], [Brazil,1], [Spain,1]]|
// |C |[[India,1]] |
// |A |[[London,1], [Sweden,2]] |
// +----+------------------------------------+
请注意,第一个groupBy/agg
转换等效于您的SQL查询。
之后,您需要按用户进行分组并收集国家和频率的地图。以下代码应该有用。
//Creating Test Data
val df = Seq(("A", "Sweden"), ("A", "Sweden"), ("A", "London"), ("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"), ("C", "India"))
.toDF("user", "country")
df.show(false)
+----+-------+
|user|country|
+----+-------+
|A |Sweden |
|A |Sweden |
|A |London |
|B |Spain |
|B |Denmark|
|B |Brazil |
|C |India |
+----+-------+
df.registerTempTable("information")
val joinMap = spark.udf.register( "joinMap" , (values: Seq[Map[String,Long]]) => values.flatten.toMap )
val resultDF = spark.sql("""SELECT user, joinMap(collect_list(map(country, frequency))) as frequencyMap
|From ( SELECT user, country, COUNT(country) as frequency
|FROM information
|GROUP BY user, country ) A
|GROUP BY user""".stripMargin)
resultDF.show(false)
+----+------------------------------------------+
|user|frequencyMap |
+----+------------------------------------------+
|A |Map(Sweden -> 2, London -> 1) |
|B |Map(Spain -> 1, Denmark -> 1, Brazil -> 1)|
|C |Map(India -> 1) |
+----+------------------------------------------+
如果您希望最终结果作为映射,请使用UDF。没有UDF,您将获得它作为地图列表。