我有一个函数my_function它有一个for循环迭代。for 循环中有一个部分需要调用数据帧 collect()。它适用于前几个循环,但它总是在第五次迭代时崩溃。你们知道为什么会这样吗?
File "my_code.py", line 189, in my_function
my_df_collect = my_df.collect()
File "/lib/spark/python/pyspark/sql/dataframe.py", line 280, in collect
port = self._jdf.collectToPython()
File "/lib/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 811, in __call__
answer = self.gateway_client.send_command(command)
File "/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 624, in send_command
connection = self._get_connection()
File "/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 579, in _get_connection
connection = self._create_connection()
File "/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 585, in _create_connection
connection.start()
File "/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 697, in start
raise Py4JNetworkError(msg, e)
Py4JNetworkError: An error occurred while trying to connect to the Java server
另一条错误消息
Exception happened during processing of request from ('127.0.0.1', 55584)
Traceback (most recent call last):
File "/anaconda/lib/python2.7/SocketServer.py", line 295, in _handle_request_noblock
self.process_request(request, client_address)
File "/anaconda/lib/python2.7/SocketServer.py", line 321, in process_request
self.finish_request(request, client_address)
File "/anaconda/lib/python2.7/SocketServer.py", line 334, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/anaconda/lib/python2.7/SocketServer.py", line 655, in __init__
self.handle()
File "/lib/spark/python/pyspark/accumulators.py", line 235, in handle
num_updates = read_int(self.rfile)
File "/lib/spark/python/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
也许 JVM 溢出了。尝试向驱动程序添加内存,或运行df.take(10)
而不是df.collect()
,以测试问题是否出在您返回的数据量上。