我有一个spark作业(在spark 1.3.1中运行)必须遍历几个键(大约42个)并处理作业。下面是程序
的结构- 从地图中获取密钥
- 从hive (hadoop-yarn下面)中获取匹配key的数据帧
- 处理数据
- 将结果写入hive
当我对一个键运行这个时,一切正常。当我使用42个键运行时,我在第12次迭代时遇到内存不足异常。是否有一种方法可以在每次迭代之间清除内存?帮助感激。
这是我正在使用的高级代码。
public abstract class SparkRunnable {
public static SparkContext sc = null;
public static JavaSparkContext jsc = null;
public static HiveContext hiveContext = null;
public static SQLContext sqlContext = null;
protected SparkRunnableModel(String appName){
//get the system properties to setup the model
// Getting a java spark context object by using the constants
SparkConf conf = new SparkConf().setAppName(appName);
sc = new SparkContext(conf);
jsc = new JavaSparkContext(sc);
// Creating a hive context object connection by using java spark
hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
// sql context
sqlContext = new SQLContext(sc);
}
public abstract void processModel(Properties properties) throws Exception;
}
class ModelRunnerMain(model: String) extends SparkRunnableModel(model: String) with Serializable {
override def processModel(properties: Properties) = {
val dataLoader = DataLoader.getDataLoader(properties)
//loads keys data frame from a keys table in hive and converts that to a list
val keysList = dataLoader.loadSeriesData()
for (key <- keysList) {
runModelForKey(key, dataLoader)
}
}
def runModelForKey(key: String, dataLoader: DataLoader) = {
//loads data frame from a table(~50 col X 800 rows) using "select * from table where key='<key>'"
val keyDataFrame = dataLoader.loadKeyData()
// filter this data frame into two data frames
...
// join them to transpose
...
// convert the data frame into an RDD
...
// run map on the RDD to add bunch of new columns
...
}
}
我的数据帧大小小于1兆。但是我通过选择和连接等创建了几个数据帧。我假设一旦迭代完成,所有这些都会被垃圾收集。
这是我正在运行的配置。
- spark.eventLog。启用:真spark.broadcast.port: 7086
- spark.driver。记忆:12 g spark.shuffle.spill:假
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.storage.memoryFraction: 0.7 spark.executor.cores: 8
- spark.io.compression.codec: lzf spark.shuffle.consolidateFiles:真
- spark.shuffle.service。启用:真spark.master: yarn-client
- spark.executor。实例:8 spark.shuffle.service。端口:7337
- spark.rdd.compress:真spark.executor.memory: 48 g gh/<>
- spark.executor。id: spark.sql.shuffle.partitions: 700
- spark.cores.max: 56
这是我得到的例外。
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at com.ning.compress.lzf.ChunkEncoder.encodeAndWriteChunk(ChunkEncoder.java:264)
at com.ning.compress.lzf.LZFOutputStream.writeCompressedBlock(LZFOutputStream.java:266)
at com.ning.compress.lzf.LZFOutputStream.write(LZFOutputStream.java:124)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
使用checkpoint()或localCheckpoint()可以减少spark沿路,并在迭代中提高应用程序的性能。