将列添加到 Spark 数据帧并为其计算值



我有一个CSV文档,我正在加载到包含纬度和经度列的SQLContext中。

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter","t").schema(customSchema).load(inputFile);

CSV 示例

metro_code, resolved_lat, resolved_lon
602, 40.7201, -73.2001

我正在尝试找出添加新列并计算每行的 GeoHex 的最佳方法。 使用geohex软件包可以轻松散列经度和经度。 我想我需要运行并行化方法,或者我已经看到一些将函数传递给 withColumn 的示例。

用 UDF 包装所需的函数应该可以解决问题:

import org.apache.spark.sql.functions.udf
import org.geohex.geohex4j.GeoHex
val df = sc.parallelize(Seq(
  (Some(602), 40.7201, -73.2001), (None, 5.7805, 139.5703)
)).toDF("metro_code", "resolved_lat", "resolved_lon")
def geoEncode(level: Int) = udf(
  (lat: Double, long: Double) => GeoHex.encode(lat, long, level))
df.withColumn("code", geoEncode(9)($"resolved_lat", $"resolved_lon")).show
// +----------+------------+------------+-----------+
// |metro_code|resolved_lat|resolved_lon|       code|
// +----------+------------+------------+-----------+
// |       602|     40.7201|    -73.2001|PF384076026|
// |      null|      5.7805|    139.5703|PR081331784|
// +----------+------------+------------+-----------+

相关内容

  • 没有找到相关文章

最新更新