Spark/Scala迭代两个数据帧



我是spark/scala世界的新手。我有两个数据源

  1. 包含网址和主机名的流量数据

  2. 定义流量网址规则的属性数据。规则是与域名匹配的正则表达式模式。一个属性 ID 可能有一个或多个规则。

如果 URL 符合条件,我必须分配一个属性 ID。流量中的每一行都可以匹配零个或多个属性条件

示例输入traffic-data

visitor_id | url                      
1000-abc10 | www.motor.com/index.html
2000-fe30a | www.lifestyle.com/cooking/pasta.html 

'属性数据

attribute_id | rule                               | describtion
101          | motor.com, auto*.com, vehicles.com | "vehicles"
102          | motor.com                          | "auto site"

预期产出:

visitor_id  | attribute_id
1000-abc10  | 101
1000-abc10  | 202

我尝试了以下方法:

val traffic_df = spark.read.parquet(<traffic-path>).as[Traffic]
val attribute_df = spark.read.parquet(<attribute-path>).as[Attribute]
traffic_df.map(row => attribute_df.map(r => TrafficAttribute(row.visitor_id, r.attribute_id)))
case class Traffic(visitor_id: String, page_url : String)
case class ConfigRow(attribute_id: String, rule: String, description: String)
case class OutputRow(visitor_id: String, attribute_id)
val configList = spark.sqlContext.read.json(<config-path>).as[ConfigRow].collect().toList
val trafficDF = spark.read.json(<traffic-path>).as[Traffic]
def determineAttributes(row: Traffic, configList: List[ConfigRow]): ListBuffer[String] = {
val attributeList = new ListBuffer[String]
for (c <- configList) {
rule = c.rule;
if (<rule matches>) attributeList += c.attribute_id
}
attributeList
}
for r = trafficDF.flatMap((row:Traffic) => {
for (attributeId <- determineAttributes(row, configList)) yield {
OutputRow(row.visitor_id, attributeId)
}
}) 

您可以在 2 个数据集上使用具有特殊连接条件的连接:

val joinCondition = $"a.url".contains($"b.rule")
var joinedDf = trafficDf.as('a).join(attributeDf.as('b),joinCondition)
joinedDf.show()
+----------+--------------------+------------+---------+-----------+
|visitor_id|                 url|attribute_id|     rule|describtion|
+----------+--------------------+------------+---------+-----------+
|1000-abc10|www.motor.com/ind...|         101|motor.com|   vehicles|
|1000-abc10|www.motor.com/ind...|         102|motor.com|  auto site|
+----------+--------------------+------------+---------+-----------+

然后,您可以使用joinedDf.select("visitor_id","attribute_id")选择所需的列

最新更新