我有一个大小为38mb的文件,有1017210行和10列。我在64位windows操作系统和8 GB RAM的独立模式下使用spark。我试图读取csv到pyspark数据框架。首先,我将数据加载为:
trainRaw = sc.textFile("D:/Rossmann/train/train.csv").map(lambda line:line.split(","))
然后我试图读取数据帧为:
trainRaw_df = trainRaw.toDF(["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday","SchoolHoliday"]).first()
但是,我得到错误作为:
16/08/17 10:27:41 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
16/08/17 10:27:41 INFO DAGScheduler: Got job 12 (runJob at PythonRDD.scala:393) with 1 output partitions
16/08/17 10:27:41 INFO DAGScheduler: Final stage: ResultStage 12 (runJob at PythonRDD.scala:393)
16/08/17 10:27:41 INFO DAGScheduler: Parents of final stage: List()
16/08/17 10:27:41 INFO DAGScheduler: Missing parents: List()
16/08/17 10:27:41 INFO DAGScheduler: Submitting ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 5.2 KB
16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 3.3 KB
16/08/17 10:27:41 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on localhost:49516 (size: 3.3 KB
16/08/17 10:27:41 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:1006
16/08/17 10:27:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
16/08/17 10:27:41 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks
16/08/17 10:27:41 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 14
16/08/17 10:27:41 INFO Executor: Running task 0.0 in stage 12.0 (TID 14)
16/08/17 10:27:41 INFO HadoopRDD: Input split: file:/D:/Rossmann/train/train.csv:0+19028976
16/08/17 10:27:42 INFO PythonRunner: Times: total = 1328
16/08/17 10:27:42 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR PythonRunner: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 14)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 14
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/17 10:27:42 ERROR TaskSetManager: Task 0 in stage 12.0 failed 1 times; aborting job
16/08/17 10:27:42 INFO TaskSchedulerImpl: Removed TaskSet 12.0
16/08/17 10:27:42 INFO TaskSchedulerImpl: Cancelling stage 12
16/08/17 10:27:42 INFO DAGScheduler: ResultStage 12 (runJob at PythonRDD.scala:393) failed in 1.454 s
16/08/17 10:27:42 INFO DAGScheduler: Job 12 failed: runJob at PythonRDD.scala:393
Traceback (most recent call last):
File "<stdin>"
File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkrdd.py"
rs = self.take(1)
File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkrdd.py"
res = self.context.runJob(self
File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkcontext.py"
port = self._jvm.PythonRDD.runJob(self._jsc.sc()
File "D:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jjava_gateway.py"
File "D:spark-1.6.1-bin-hadoop2.6pythonpysparksqlutils.py"
return f(*a
File "D:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jprotocol.py"
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times
): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
>>> 16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:49516 in memory (size: 3.3 KB
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_19_piece0 on localhost:49516 in memory (size: 3.3 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 14
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_18_piece0 on localhost:49516 in memory (size: 6.1 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 13
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 12
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:49516 in memory (size: 3.7 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 11
16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_14_piece0 on localhost:49516 in memory (size: 3.7 KB
16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 10
我增加了工作内存并更改了JAVA_OPTS,如下所示:
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_WORKER_MEMORY=6g"
export SPARK_MEM=6g"
export SPARK_DAEMON_MEMORY=6g"
export SPARK_JAVA_OPTS=""-Dspark.executor.memory=6g -Dspark.storage.memoryFraction=0.66 -Dspark.serializer=org.apache.spark.serializer.JavaSerializer -Dspark.executor.memory=6g -Dspark.locality.wait=60000000"
export JAVA_OPTS=""-Xms6G -Xmx6G"""
但是没有任何帮助。请建议我如何处理这种类型的内存问题
是否trainRaw.show()也失败?如果没有,请尝试传递带有类型的适当模式,这会使操作更容易。
from pyspark.sql.types import *
schema = StructType([StructField('art_Store', StringType(), True),
...
print trainRaw.toDF(schema).first()