我是spark/scala
世界的新手。我有两个数据源
包含网址和主机名的流量数据
定义流量网址规则的属性数据。规则是与域名匹配的正则表达式模式。一个属性 ID 可能有一个或多个规则。
如果 URL 符合条件,我必须分配一个属性 ID。流量中的每一行都可以匹配零个或多个属性条件
示例输入traffic-data
visitor_id | url
1000-abc10 | www.motor.com/index.html
2000-fe30a | www.lifestyle.com/cooking/pasta.html
'属性数据
attribute_id | rule | describtion
101 | motor.com, auto*.com, vehicles.com | "vehicles"
102 | motor.com | "auto site"
预期产出:
visitor_id | attribute_id
1000-abc10 | 101
1000-abc10 | 202
我尝试了以下方法:
val traffic_df = spark.read.parquet(<traffic-path>).as[Traffic]
val attribute_df = spark.read.parquet(<attribute-path>).as[Attribute]
traffic_df.map(row => attribute_df.map(r => TrafficAttribute(row.visitor_id, r.attribute_id)))
case class Traffic(visitor_id: String, page_url : String)
case class ConfigRow(attribute_id: String, rule: String, description: String)
case class OutputRow(visitor_id: String, attribute_id)
val configList = spark.sqlContext.read.json(<config-path>).as[ConfigRow].collect().toList
val trafficDF = spark.read.json(<traffic-path>).as[Traffic]
def determineAttributes(row: Traffic, configList: List[ConfigRow]): ListBuffer[String] = {
val attributeList = new ListBuffer[String]
for (c <- configList) {
rule = c.rule;
if (<rule matches>) attributeList += c.attribute_id
}
attributeList
}
for r = trafficDF.flatMap((row:Traffic) => {
for (attributeId <- determineAttributes(row, configList)) yield {
OutputRow(row.visitor_id, attributeId)
}
})
您可以在 2 个数据集上使用具有特殊连接条件的连接:
val joinCondition = $"a.url".contains($"b.rule")
var joinedDf = trafficDf.as('a).join(attributeDf.as('b),joinCondition)
joinedDf.show()
+----------+--------------------+------------+---------+-----------+
|visitor_id| url|attribute_id| rule|describtion|
+----------+--------------------+------------+---------+-----------+
|1000-abc10|www.motor.com/ind...| 101|motor.com| vehicles|
|1000-abc10|www.motor.com/ind...| 102|motor.com| auto site|
+----------+--------------------+------------+---------+-----------+
然后,您可以使用joinedDf.select("visitor_id","attribute_id")
选择所需的列