我正在尝试修复我在火花设置中看到的户外信息问题,此时,我无法对我为什么看到这一点进行具体分析。在将数据框写入Parquet或Kafka时,我总是会看到这个问题。我的数据框有5000行。这是
root
|-- A: string (nullable = true)
|-- B: string (nullable = true)
|-- C: string (nullable = true)
|-- D: array (nullable = true)
| |-- element: string (containsNull = true)
|-- E: array (nullable = true)
| |-- element: string (containsNull = true)
|-- F: double (nullable = true)
|-- G: array (nullable = true)
| |-- element: double (containsNull = true)
|-- H: integer (nullable = true)
|-- I: double (nullable = true)
|-- J: double (nullable = true)
|-- K: array (nullable = true)
| |-- element: double (containsNull = false)
列G列的单元大小最高为16MB。我的数据框中的总尺寸约为10GB,分为12个分区。在写作之前,我正在尝试使用Repartition()从此中创建48个分区,但是即使我不重新进行编写,也可以看到问题。在此例外情况下,我只有一个数据框,大小约为10GB。我的驱动程序有19GB的自由记忆,两个执行者每个都有8 GB的免费内存。Spark版本为2.1.0.Cloudera1,Scala版本为2.11.8。
我有以下设置:
spark.driver.memory 35G
spark.executor.memory 25G
spark.executor.instances 2
spark.executor.cores 3
spark.driver.maxResultSize 30g
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1g
spark.rdd.compress true
spark.rpc.message.maxSize 2046
spark.yarn.executor.memoryOverhead 4096
异常追溯是
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:991)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:765)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:764)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:764)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
有任何见解?
我们终于找到了这个问题。我们在Scala中运行Kfold Logistic回归,该回归是在5000行数据范围内,k size as 4。进行分类后,我们基本上获得了4个尺寸1250的测试输出数据范围,每个数据范围至少由200个分区分区。因此,在5000行数据上,我们拥有超过800个分区。然后,该代码将继续将这些数据重新分配到48个分区。我们的系统无法处理此重新分配,这可能是由于改组。为了解决此问题,我们将每个折叠输出数据框架重新分配到一个较小的数字(而不是在组合数据框架上进行),这已解决了问题。