在Spark RDD或使用另一个RDD/df的数据帧中执行查找/转换



我很难实现一些看起来应该很容易的东西:

我的目标是使用另一个RDD/dataframe作为查找表或翻译字典在一个RDD/dataframe中进行翻译。我想把这些翻译放在多个列中。

解释这个问题最简单的方法是举例说明。假设我有以下两个rdd作为输入:
Route SourceCityID DestinationCityID
A     1            2
B     1            3
C     2            1

CityID CityName
1      London
2      Paris
3      Tokyo

我想要的输出RDD是:

Route SourceCity DestinationCity
A     London     Paris
B     London     Tokyo
C     Paris      London

我该如何生产它呢?

这是SQL中的一个简单问题,但我不知道Spark中的rdd有明显的解决方案。joincogroup等方法似乎不太适合多列rdd,并且不允许指定要连接的列。

任何想法?SQLContext是答案吗?

方法:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])

print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) 
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). 
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()

打印:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]

和SQLContext方式:

from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
df_routes = sqlContext.createDataFrame(
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(
cities, ["CityID", "CityName"])
temp =  df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) 
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")
print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) 
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()

打印:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]

假设我们有两个包含路线和城市的rdd:

val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))

有几种方法可以实现城市查找。假设城市查找包含很少的项,而路线查找包含很多项。在这种情况下,让我们从收集城市作为驱动程序发送给每个任务的地图开始。

val citiesByID = citiesByIDRDD.collectAsMap
routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

为了避免向每个任务发送查找表,而只向工人发送一次,您可以扩展现有代码广播查找映射。

val bCitiesByID = sc.broadcast(citiesByID)
routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

我不认为这里需要数据帧,但如果你想,你可以:

import sqlContext.implicits._
case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)
val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))
val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")
routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
|  A|   1|  2|
|  B|   1|  3|
|  C|   2|  1|
+---+----+---+
citiesDf.show
+---+------+
| id|  name|
+---+------+
|  1|London|
|  2| Paris|
|  3| Tokyo|
+---+------+

你提到这是一个简单的问题在SQL,所以我认为你可以从这里拿走它。执行SQL是这样的:

sqlContext.sql ("SELECT COUNT(*) FROM routes")

相关内容

  • 没有找到相关文章