我有一个spark数据帧,其中有一列是特定的location_string
,我本质上只想将其分解为3列,分别称为country
、region
和city
。然后,我想将这些列与现有的country
、region
、city
列合并,以确保填充NULL值。或者换句话说,我想将我的函数应用于city
、region
或country
为NULL的行,试图使用location_string
填充这些值。
示例数据集:
+--------------------+-----------------+------+-------+
| location_string| city|region|country|
+--------------------+-----------------+------+-------+
|Jonesboro, AR, US...| NULL| AR| NULL|
|Lake Village, AR,...| Lake Village| AR| USA|
|Little Rock, AR, ...| Little Rock| AR| USA|
|Little Rock, AR, ...| Little Rock| AR| USA|
|Malvern, AR, US, ...| Malvern| NULL| USA|
|Malvern, AR, US, ...| Malvern| AR| USA|
|Morrilton, AR, US...| Morrilton| AR| USA|
|Morrilton, AR, US...| Morrilton| AR| USA|
|N. Little Rock, A...|North Little Rock| AR| USA|
|N. Little Rock, A...|North Little Rock| AR| USA|
|Ozark, AR, US, 72949| Ozark| AR| USA|
|Ozark, AR, US, 72949| Ozark| AR| USA|
|Palestine, AR, US...| NULL| AR| USA|
|Pine Bluff, AR, U...| Pine Bluff| AR| NULL|
|Pine Bluff, AR, U...| Pine Bluff| AR| USA|
|Prescott, AR, US,...| Prescott| AR| USA|
|Prescott, AR, US,...| Prescott| AR| USA|
|Searcy, AR, US, 7...| Searcy| AR| USA|
|Searcy, AR, US, 7...| Searcy| AR| USA|
|West Memphis, AR,...| West Memphis| NULL| USA|
+--------------------+-----------------+------+-------+
分解位置字符串的示例函数:
def geocoder_decompose_location(location_string):
if not location_string:
return {'country': None, 'state': None, 'city': None}
GOOGLE_GEOCODE_API_KEY = "<API KEY HERE>"
result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
return {'country': result.country, 'state': result.state, 'city': result.city}
scala伪码
首先,我们需要从df中删除AllDuplicates(这将减少对google服务的API调用(。
import spark.implicits._
case class Data(location_string:String,city: String,region: String,country: String)
val cleaner = ((location_string: String) => {
try{
GOOGLE_GEOCODE_API_KEY = "<API KEY HERE>"
val result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
Some(result)
} catch {
case error: Exception => println(error); None;
}
})
output.as[Data].dropDuplicates("location_string").map(x => {
val toCheck = (x.city ==null || x.country == null || x.region == null) // can also add blank check with StringUtils.isBlank
if(toCheck){
val result = cleaner(x.location_string)
val city = if(nullcheck on result.city value) result.city else x.city
val country = if(nullcheck on result.country value) result.country else x.country
val region = if(nullcheck on result.state value) result.state else x.region
Data(x.location_string , city,country,region)
}else x
})
我们还可以在删除重复项之前执行orderBy(desc("city"(、desc("country"(、desk("state"((,这样当存在重复项时(将删除一个具有空值的项(。