Spark 检查点非流式处理 - 检查点文件可用于后续作业运行或驱动程序



这篇文字来自一篇有趣的文章:http://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/

" ...检查点将 rdd 物理存储到 hdfs 并销毁创建它的世系。即使在 Spark 应用程序终止后,检查点文件也不会被删除。检查点文件可用于后续作业运行或驱动程序。RDD 的检查点会导致双重计算,因为该操作将首先调用缓存,然后再执行计算和写入检查点目录的实际工作。...">

我似乎记得在其他地方读到检查点文件仅适用于给定 Spark 应用程序中的作业或共享作业。

寻求澄清以及新应用程序如何使用检查点目录,因为我认为这是不可能的。

我似乎记得在其他地方读到过,检查点文件只是 的作业或给定 Spark 应用中的共享作业。

即使停止SparkContext,Spark 也不会清除checkpoint目录。我们可以通过设置以下属性来打开自动清理:

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

寻求澄清以及新应用程序如何使用检查点 目录,因为我认为这是不可能的。

要再次重用检查点数据集,我们可以按照以下步骤操作:

  1. 启动上下文 1 和检查点数据集:
// Setting logger on for ReliableRDDCheckpointData
scala> import org.apache.log4j.{Level, Logger}
scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)
// Note application ID
scala> spark.sparkContext.applicationId
res1: String = local-1567969150914
// Set checkpoint Dir
scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")
// File system localtion
Users-Air:checkpoint User$ pwd
/tmp/spark/checkpoint
Users-Air:checkpoint User$ ls -lrth
total 0
drwxr-xr-x  2 User  wheel    64B Sep  8 15:00 7aabcb46-e707-49dd-8893-148a162368d5
// Create Dataframe
scala> val df = spark.range(3).withColumn("random", rand())
scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+
scala> df.schema
res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
//Check point 
scala> df.checkpoint
19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]
// New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
Users-Air:rdd-7 User$ ls -lrth
total 32
-rw-r--r--  1 User  wheel     4B Sep  8 15:02 part-00000
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00002
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00001
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00003
// Stop context 
scala> spark.stop
scala> :quit
  1. 启动新的上下文 2 并读取检查点数据集
// Initilaized New Context 
scala> spark.sparkContext.applicationId
res0: String = local-1567969525656

SparkContext.checkpointFile是一个protected[spark]方法,所以我们需要在包下创建类org.apache.spark

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package org.apache.spark
object RecoverCheckpoint {
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
sc.checkpointFile[T](path)
}
}

现在使用上面的RecoverCheckpoint类将检查点RDD恢复RDD[InternalRow]

// Path from first context
scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
scala> import org.apache.spark.RecoverCheckpoint
scala> import org.apache.spark.sql.catalyst.InternalRow
scala> import org.apache.spark.sql.types._
scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)
// RDD is recovered as RDD[InternalRow]
scala> RecoveredRDD
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34
// Count matches with original
RecoveredRDD.count
res3: Long = 3

将恢复的RDD转换为创建RecoverCheckpointRDDToDF类的数据集


// Need to convert RDD[InternalRow] to DataFrame
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
// Creating Dataframe from RDD[InternalRow]
package org.apache.spark.sql
object RecoverCheckpointRDDToDF {
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
spark.internalCreateDataFrame(catalystRows, schema)
}
}

最后,使用RecoverCheckpointRDDToDF并取回数据集

// Schema should be know
val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))
scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)
scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+
// Same as first context
// Stop context
scala> spark.stop
scala> :quit

最新更新