Apache Spark 广播变量给出映射分区错误



我是python和Spark的新手,在这里我正在尝试广播Spark Rtree Index。当我尝试使用mapPartitions功能广播索引时,它出现以下错误

在视窗上:

文件 "avlClass.py",第 42 行,在 avlFileLine 中

for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters
buffer_size,y_meters+buffer_size]):
File "C:Python27ArcGIS10.3libsite-packagesrtreeindex.py", line 440, in in
tersection p_mins, p_maxs = self.get_coordinate_pointers(coordinates)
File "C:Python27ArcGIS10.3libsite-packagesrtreeindex.py", line 294, in ge
t_coordinate_pointers
dimension = self.properties.dimension
File "C:Python27ArcGIS10.3libsite-packagesrtreeindex.py", line 883, in ge
t_dimension
return core.rt.IndexProperty_GetDimension(self.handle)
indowsError: exception: access violation reading 0x00000004
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala
166)

在 Linux 中:

ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)

文件 : avlClass.py

import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf
class avlClass(object):
    def __init__(self,name):
        self.name=name
    def create_index(self):
        # Read the ESRI Shape File
        shapeFileName='C:\shapefiles\Road.shp'
        polygons= [ pol for pol in fiona.open(shapeFileName,'r') ]
        p=index.Property()
        p.dimension=2
        self_idx=index.Index(property=p)
        # Create Index Entries
        for pos,features in enumerate(polygons):
           self_idx.insert(pos,LineString(features['geometry']  ['coordinates']).bounds )
        return self_idx

    def avlFileLine(self,iter,bv):
      for line in iter:
            splits =line.split(',')
            lat= float(splits[2])
            long= float(splits[3])
            print  lat,long
            x='No'
            # Test the index from broadcast Variable bv
           buffer_size=10
            x_meters=-9511983.32151
            y_meters=4554613.80307
            for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
                x= "FOUND"
            yield lat,long,heading_radians,x

文件 : avlSpark.py

import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf
from avlClass import avlClass
if __name__ == '__main__':
    conf = SparkConf().setAppName('AVL_Spark_Job')
    conf = SparkConf().setMaster('local[*]')
    sc= SparkContext(conf=conf)
    sc.addPyFile("avlClass.py")
    test_avlClass=avlClass("Test")
    print test_avlClass.name
    idx= test_avlClass.create_index()
    # Test the created index
    buffer_size=10
    x_meters=-9511983.32151
    y_meters=4554613.80307
    for j in idx.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
         print "FOUND"  # Index Worked
    # broadcast Index for Partitions
    idx2=sc.broadcast(idx)

    FileName='c:\test\file1.txt'
    avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value))
    for line in avlFile.take(10):
     print line

我看到的是你正在创建一个广播变量:

# broadcast Index for Partitions
idx2=sc.broadcast(idx)

但随后将其 .value 传递到 AvlFileLine:

avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value))

但 idx 和 idx2 都不是 RDD,idx2 作为一个广播变量,将采用 IDX 的任何类别。(我实际上是根据你的问题问这个问题:)

您仍然将该传递的参数视为广播变量,但随后尝试将其视为RDD,大概是PythonRDD,如前所述,它不是。广播变量不是RDD,它只是您分配给它的任何类型。此外,您正在将其值(使用.value())传递到AVLFileLine中。

所以当你在它上面调用 intersection() 时,它会爆炸。我很惊讶它没有更好地爆炸,但我在 Java 中工作,编译器会抓住这一点,我假设在 Python 中解释器只是愉快地运行,直到它到达一个错误的内存位置,你得到那个丑陋的错误消息:)

我认为最好的方法是从一开始就重新考虑你的代码,它只是没有正确使用Spark。我不太了解你的具体应用程序,所以我最好的猜测是你需要放弃intersection(),而是再次查看Python的Spark文档的RDD编程指南部分。找到一种方法将value idx2应用于avlfile,这是一个RDD。你需要避免传递的函数内的任何for循环,Spark通过应用你传递给RDD的每个元素的任何函数来为你做"for"循环。请记住,结果将是另一个RDD。

伪Java代码中,它看起来像:

 SomeArray theArray = avlfile.map({declare inline or call function}).collect(<if the RDD is not too big for collect>) 

我希望这有所帮助,如果你还没有这样做,一本很棒的书是O'Reilly的Learning Spark和这本书的示例代码,这是Apache Spark Docs的下一步。Learning Spark 可以以低于 10 美元的价格租用,就我而言,我作为大学生通过 Safari Books 免费获得它。

如果您不习惯从函数式编程的角度思考,那么编写 Spark 程序的学习曲线很陡峭,我并不比您领先那么远,恕我直言,您只是不太了解 Spark 编程模型。我希望这一切都有所帮助。


同样如本答案的原始编辑中所述,您对 SparkConf 的调用是错误的,我不得不在文档 (.9) 中返回一种方法来查找示例,但您想要这样的东西:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

从独立程序部分...现在我相信你的第二个任务 conf 覆盖了第一个。


总结:我不明白你会在广播变量上调用RDD函数,广播变量不是RDD,而只是一种数据结构,就像一个全局,由所有工作线程读取(而不是写入)。根据 Scala 中的广播类

来自广播变量的文档:

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]

在我看来,在bv(不是RDD)上调用intersection()没有意义。

最新更新