使用MaxMind Geo数据激发UDF



我正在尝试使用maxmind雪地库来删除我在数据帧中每个IP上的地理数据。

我们使用的是Spark SQL(Spark版本2.1.0),我在以下类中创建了一个UDF:

class UdfDefinitions @Inject() extends Serializable with StrictLogging {
 sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
 val s3Config = configuration.databases.dataWarehouse.s3
 val lruCacheConst = 20000
 val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
  ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
 def lookupIP(ip: String): LookupIPResult = {
  val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
  loc match {
    case None => LookupIPResult("", "", "")
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
   x.city.getOrElse(""), x.regionName.getOrElse(""))
   }
 }
 val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}

目的是创建指向UDF外部文件(IPLookups)的指针并在内部使用它,因此不要在每行上打开文件。这会遇到任务错误,没有序列化,当我们在UDF中使用addfiles时,我们会收到太多的文件打开错误(在使用大数据集时,在一个小数据集上确实有效)。

>

此线程显示如何使用RDD来解决问题,但我们想使用Spark SQL。在Spark序列化

中使用MaxMind Geoip

有什么想法吗?谢谢

这里的问题是iplookups不可序列化。然而,它可以从静态文件(FRMO收集的内容)中查找查找,因此您应该能够解决这个问题。我建议您克隆回购,并使iPlookups序列化。然后,为了使其与Spark SQL一起使用,请像您一样将所有内容包裹在课堂上。在主要火花作业中,您可以按照以下内容写一些内容:

val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))

如果您没有那么多不同的IP地址,则有另一个解决方案:您可以在驾驶员中完成所有操作。

val ipMap = data.select("IP").distinct.collect
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
    .toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))

相关内容

  • 没有找到相关文章

最新更新