我正在尝试使用maxmind雪地库来删除我在数据帧中每个IP上的地理数据。
我们使用的是Spark SQL(Spark版本2.1.0),我在以下类中创建了一个UDF:
class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
目的是创建指向UDF外部文件(IPLookups)的指针并在内部使用它,因此不要在每行上打开文件。这会遇到任务错误,没有序列化,当我们在UDF中使用addfiles时,我们会收到太多的文件打开错误(在使用大数据集时,在一个小数据集上确实有效)。
>此线程显示如何使用RDD来解决问题,但我们想使用Spark SQL。在Spark序列化
中使用MaxMind Geoip有什么想法吗?谢谢
这里的问题是iplookups不可序列化。然而,它可以从静态文件(FRMO收集的内容)中查找查找,因此您应该能够解决这个问题。我建议您克隆回购,并使iPlookups序列化。然后,为了使其与Spark SQL一起使用,请像您一样将所有内容包裹在课堂上。在主要火花作业中,您可以按照以下内容写一些内容:
val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))
如果您没有那么多不同的IP地址,则有另一个解决方案:您可以在驾驶员中完成所有操作。
val ipMap = data.select("IP").distinct.collect
.map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
.toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))