我目前正在解决涉及总线的GPS数据的问题。我面临的问题是减少我的流程中的计算。
一张桌子中有大约20亿个GPS坐标点(纬度度),在另一台桌子中,其纬度大约有12,000个公交车站。预计200亿点仅5-10%在公交车站。
问题:我需要标记和提取在公交车站(12,000点)的那些点(在200亿分)中。由于这是GPS数据,因此我不能完全匹配坐标,而是基于公差的地理范围。
问题:标记总线停靠的过程在当前的幼稚方法中需要很长时间。目前,我们正在选择12,000个公交点中的每个点,并以100m的公差查询200亿点(通过将学位差异转换为距离)。
问题:是否有一个算法有效的过程来实现点的标记?
是的,您可以使用诸如空间Spark之类的东西。它仅适用于SPARK 1.6.1,但您可以使用BroadcastSpatialJoin创建一个非常有效的RTree
。
以下是我使用带有pyspark的空间Spark检查不同多边形还是相交的示例:
:from ast import literal_eval as make_tuple
print "Java Spark context version:", sc._jsc.version()
spatialspark = sc._jvm.spatialspark
rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)])
rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)])
rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)])
pointD = Point((-1, -1))
def geomABWithId():
return sc.parallelize([
(0L, rectangleA.wkt),
(1L, rectangleB.wkt)
])
def geomCWithId():
return sc.parallelize([
(0L, rectangleC.wkt)
])
def geomABCWithId():
return sc.parallelize([
(0L, rectangleA.wkt),
(1L, rectangleB.wkt),
(2L, rectangleC.wkt)])
def geomDWithId():
return sc.parallelize([
(0L, pointD.wkt)
])
dfAB = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt'])
dfABC = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt'])
dfC = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt'])
dfD = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt'])
# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD
SpatialOperator = spatialspark.operator.SpatialOperator
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin
joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)
joinRDD.count()
results = joinRDD.collect()
map(lambda result: make_tuple(result.toString()), results)
# [(0, 0), (1, 1), (2, 0)] read as:
# ID 0 is within 0
# ID 1 is within 1
# ID 2 is within 0
注意线
joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)
最后一个参数是一个缓冲区值,在您的情况下,这将是您要使用的公差。如果您使用LAT/LON,这可能是一个很小的数字,因为它是一个径向系统,并且取决于所需的仪表,以便您需要根据LAT/LON来计算您的感兴趣领域。