我有一个CSV文件和一个JSON文件。在count.csv中,有三列(纬度,经度,计数)。在JSON中,这是一个示例:
{
"type": "Feature",
"properties": {
"ID": "15280000000231",
"TYPES": "Second Class",
"N2C": "9",
"NAME": "Century Road"
},
"geometry": {
"type": "LineString",
"coordinates": [
[
6.1395489,
52.3107973
],
[
6.1401178,
52.3088457
],
[
6.1401126,
52.3088071
]
]
}
}
目前,我的Scala代码近似并匹配经度和纬度,并过滤CSV文件,以匹配LON/LAT,返回LAT/LON并将其视为CSV。
我想从JSON返回所有属性(ID,类型,N2C和名称),并将匹配的LAT/LON作为JSON的csv 属性作为JSON的原始linestring Count作为JSON文件而不是JSON文件而不是CSV。
到目前为止,我一直在努力做到这一点?
case class ScoredLocation(latitude: Double, longitude: Double, score: Int)
object ScoreFilter {
val Epsilon = 10000
val DoubleToRoundInt = udf(
(coord:Double) => (coord * Epsilon).toInt
)
val schema = Encoders.product[ScoredLocation].schema
val route_count = spark.read.schema(schema).format("csv").load("count.csv")
.withColumn("lat_aprx", DoubleToRoundInt($"latitude"))
.withColumn("lon_aprx", DoubleToRoundInt($"longitude"))
val match_route = spark.read.format("json").load("matchroute.json")
.select(explode($"geometry.coordinates"))
.select($"col".getItem(0).alias("latitude"), $"col".getItem(1).alias("longitude"))
.withColumn("lat_aprx", DoubleToRoundInt($"latitude"))
.withColumn("lon_aprx", DoubleToRoundInt($"longitude"))
europe_count.show()
scenic_route.show()
val result = route_count.join(match_route, Seq("lat_aprx", "lon_aprx"), "leftsemi")
.select($"latitude", $"longitude", $"count")
result.show()
result.write.format("csv").save("result.csv")
}
响应编辑:
我使用解决方案得出此错误。
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve '`ID`' given input columns: [count, latitude,
longitude, lat_aprx, lon_aprx];;
'Project [latitude#3, longitude#4, score#5, 'ID, 'TYPES, 'N2C, 'NAME]
+- Project [lat_aprx#10, lon_aprx#16, latitude#3, longitude#4,score#5]
+- Join LeftSemi, ((lat_aprx#10 = lat_aprx#55) && (lon_aprx#16 = lon_aprx#63))
:- Project [latitude#3, longitude#4, score#5, lat_aprx#10, if
(isnull(longitude#4)) null else UDF(longitude#4) AS lon_aprx#16]
: +- Project [latitude#3, longitude#4, count#5, if
(isnull(latitude#3)) null else UDF(latitude#3) AS lat_aprx#10]
: +- Relation[latitude#3,longitude#4,count#5] csv
+- Project [ID#38, TYPES#39, N2C#40, NAME#41, coords#48,
lat_aprx#55, if (isnull(coords#48[1])) null else UDF(coords#48[1]) AS
lon_aprx#63]
+- Project [ID#38, TYPES#39, N2C#40, NAME#41, coords#48, if
(isnull(coords#48[0])) null else UDF(coords#48[0]) AS lat_aprx#55]
+- Project [properties#32.ID AS ID#38,
properties#32.TYPES AS TYPES#39, properties#32.N2C AS N2C#40,
properties#32.NAME AS NAME#41, coords#48]
+- Generate explode(geometry#31.coordinates), true,
false, [coords#48]
+- Relation[geometry#31,properties#32,type#33] json
编辑2:我现在正在返回JSON,其中添加了计数,但是现在的问题是返回为原始Geojson,作为类型linestring,添加了计数,下面是示例。它应该更像上面的原始JSON。我想它可以在此之后进行操作,但是我希望这样做是一个火花SQL过程。有什么想法吗?
{
"lat":5.2509524,
"lon":53.3926721,
"count":1,
"ID":"15280000814947",
"TYPES":"Second Class",
"N2C":"9"
}{
"lat":5.251464,
"lon":53.3919782,
"count":4,
"ID":"15280000814947",
"TYPES":"Second Class",
"N2C":"9"
}{
"lat":5.251674,
"lon":53.3916119,
"count":4,
"ID":"15280000814947",
"TYPES":"Second Class",
"N2C":"9"
}
处理match_route
DataFrame时,请确保选择实际要保留的所有列。例如:
val match_route = spark.read.format("json").load("matchroute.json")
.select($"properties.*", explode($"geometry.coordinates").as("coords"))
.withColumn("latitude", $"coords".getItem(0))
.withColumn("longitude", $"coords".getItem(1))
.withColumn("lat_aprx", DoubleToRoundInt(latitude))
.withColumn("lon_aprx", DoubleToRoundInt(longitude))
.drop($"coords")
确保在上一个选择中添加相关列,
val result = route_count.join(match_route, Seq("lat_aprx", "lon_aprx"), "leftsemi")
.select($"latitude", $"longitude", $"count", $"ID", $"TYPES", $"N2C", $"NAME")