Pyspark无法创建包含1000000行以上的数据框架



我有一个大小为38mb的文件,有1017210行和10列。我在64位windows操作系统和8 GB RAM的独立模式下使用spark。我试图读取csv到pyspark数据框架。首先,我将数据加载为:

    trainRaw = sc.textFile("D:/Rossmann/train/train.csv").map(lambda line:line.split(","))

然后我试图读取数据帧为:

    trainRaw_df = trainRaw.toDF(["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday","SchoolHoliday"]).first()

但是,我得到错误作为:

    16/08/17 10:27:41 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
    16/08/17 10:27:41 INFO DAGScheduler: Got job 12 (runJob at PythonRDD.scala:393) with 1 output partitions
    16/08/17 10:27:41 INFO DAGScheduler: Final stage: ResultStage 12 (runJob at PythonRDD.scala:393)
    16/08/17 10:27:41 INFO DAGScheduler: Parents of final stage: List()
    16/08/17 10:27:41 INFO DAGScheduler: Missing parents: List()
    16/08/17 10:27:41 INFO DAGScheduler: Submitting ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
    16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 5.2 KB
    16/08/17 10:27:41 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 3.3 KB
    16/08/17 10:27:41 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on localhost:49516 (size: 3.3 KB
    16/08/17 10:27:41 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:1006
    16/08/17 10:27:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (PythonRDD[38] at RDD at PythonRDD.scala:43)
    16/08/17 10:27:41 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks
    16/08/17 10:27:41 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 14
    16/08/17 10:27:41 INFO Executor: Running task 0.0 in stage 12.0 (TID 14)
    16/08/17 10:27:41 INFO HadoopRDD: Input split: file:/D:/Rossmann/train/train.csv:0+19028976
    16/08/17 10:27:42 INFO PythonRunner: Times: total = 1328
    16/08/17 10:27:42 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    16/08/17 10:27:42 ERROR PythonRunner: This may have been caused by a prior exception:
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    16/08/17 10:27:42 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 14)
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    16/08/17 10:27:42 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 14
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    16/08/17 10:27:42 ERROR TaskSetManager: Task 0 in stage 12.0 failed 1 times; aborting job
    16/08/17 10:27:42 INFO TaskSchedulerImpl: Removed TaskSet 12.0
    16/08/17 10:27:42 INFO TaskSchedulerImpl: Cancelling stage 12
    16/08/17 10:27:42 INFO DAGScheduler: ResultStage 12 (runJob at PythonRDD.scala:393) failed in 1.454 s
    16/08/17 10:27:42 INFO DAGScheduler: Job 12 failed: runJob at PythonRDD.scala:393
    Traceback (most recent call last):
      File "<stdin>"
      File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkrdd.py"
        rs = self.take(1)
      File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkrdd.py"
        res = self.context.runJob(self
      File "D:spark-1.6.1-bin-hadoop2.6pythonpysparkcontext.py"
        port = self._jvm.PythonRDD.runJob(self._jsc.sc()
      File "D:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jjava_gateway.py"
      File "D:spark-1.6.1-bin-hadoop2.6pythonpysparksqlutils.py"
        return f(*a
      File "D:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jprotocol.py"
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times
    ): java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
            at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
            at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
            at py4j.Gateway.invoke(Gateway.java:259)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:209)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
            at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
            at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
            at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
    >>> 16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:49516 in memory (size: 3.3 KB
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_19_piece0 on localhost:49516 in memory (size: 3.3 KB
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 14
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_18_piece0 on localhost:49516 in memory (size: 6.1 KB
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 13
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 12
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:49516 in memory (size: 3.7 KB
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 11
    16/08/17 10:32:03 INFO BlockManagerInfo: Removed broadcast_14_piece0 on localhost:49516 in memory (size: 3.7 KB
    16/08/17 10:32:03 INFO ContextCleaner: Cleaned accumulator 10

我增加了工作内存并更改了JAVA_OPTS,如下所示:

    export  SPARK_MASTER_IP=127.0.0.1
    export  SPARK_LOCAL_IP=127.0.0.1
    export  SPARK_WORKER_MEMORY=6g"
    export  SPARK_MEM=6g"
    export  SPARK_DAEMON_MEMORY=6g"
    export  SPARK_JAVA_OPTS=""-Dspark.executor.memory=6g -Dspark.storage.memoryFraction=0.66 -Dspark.serializer=org.apache.spark.serializer.JavaSerializer -Dspark.executor.memory=6g -Dspark.locality.wait=60000000"
    export  JAVA_OPTS=""-Xms6G -Xmx6G"""

但是没有任何帮助。请建议我如何处理这种类型的内存问题

是否trainRaw.show()也失败?如果没有,请尝试传递带有类型的适当模式,这会使操作更容易。

from pyspark.sql.types import *
schema = StructType([StructField('art_Store', StringType(), True),
...
print trainRaw.toDF(schema).first()

相关内容

  • 没有找到相关文章

最新更新