我的期望
目标是为每个DataFrame
行添加一个具有修改时间的列。
给定
val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())
+----+------------------------+
| id | input_file_name |
+----+------------------------+
| 1 | hdfs://path/part-00001 |
| 2 | hdfs://path/part-00001 |
| 3 | hdfs://path/part-00002 |
+----+------------------------+
预期
+----+------------------------+
| id | modification_time |
+----+------------------------+
| 1 | 2000-01-01Z00:00+00:00 |
| 2 | 2000-01-01Z00:00+00:00 |
| 3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+
我尝试了什么
我写了一个函数来获取修改时间
def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")
修改时间:长=1580708401253
。。。但它在查询中不起作用
def input_file_modification_time = udf((path: String) => getModificationTime(path))
data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)
org.apache.spark.SparkException:由于阶段失败而中止作业:阶段54.0中的任务0失败4次,最近的失败:阶段54.0中丢失任务0.3(TID 408,srs-hdp-s1.dev.kontur.ru,executor 3(:org.apache.spak.SparkException:无法执行用户定义的函数($anonfun$input_file_modification_time$1:(string(=>bigint(
问题是spark
在UDF中为null,因为它只存在于驱动程序上。另一个问题是hadoopsConfiguration
是不可序列化的,因此您无法轻松地将其包含在udf中。但是有一个使用org.apache.spark.SerializableWritable
:的解决方案
import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration
val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)
def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
org.apache.hadoop.fs.FileSystem.get(conf.value)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))
data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)
Note
为DataFrame的每一行调用getModificationTime
将对性能产生影响。
修改您的代码以一次性获取文件元数据&存储在files:Map[String,Long]
中,创建UDFinput_file_modification_time
以从Map[String,Long]获取数据。
请检查以下代码。
scala> val df = spark.read.format("parquet").load("/tmp/par")
df: org.apache.spark.sql.DataFrame = [id: int]
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
// Exiting paste mode, now interpreting.
getModificationTime: (path: String)Long
scala> implicit val files = df.inputFiles.flatMap(name => Map(name -> getModificationTime(name))).toMap
files: scala.collection.immutable.Map[String,Long] = Map(file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_2.snappy.parquet -> 1588080295000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_3.snappy.parquet -> 1588080299000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_4.snappy.parquet -> 1588080302000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000.snappy.parquet -> 1588071322000)
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getTime(fileName:String)(implicit files: Map[String,Long]): Long = {
files.getOrElse(fileName,0L)
}
// Exiting paste mode, now interpreting.
getTime: (fileName: String)(implicit files: Map[String,Long])Long
scala> val input_file_modification_time = udf(getTime _)
input_file_modification_time: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
scala> df.withColumn("createdDate",input_file_modification_time(input_file_name)).show
+---+-------------+
| id| createdDate|
+---+-------------+
| 1|1588080295000|
| 2|1588080295000|
| 3|1588080295000|
| 4|1588080295000|
| 5|1588080295000|
| 6|1588080295000|
| 7|1588080295000|
| 8|1588080295000|
| 9|1588080295000|
| 10|1588080295000|
| 11|1588080295000|
| 12|1588080295000|
| 13|1588080295000|
| 14|1588080295000|
| 15|1588080295000|
| 16|1588080295000|
| 17|1588080295000|
| 18|1588080295000|
| 19|1588080295000|
| 20|1588080295000|
+---+-------------+
only showing top 20 rows
scala>
您所需要的只是系统上的一些路径(s3://(
val pathOnTheSystem = "s3://some_path"
val hadoopConf = new Configuration()
val uri = new URI(pathOnTheSystem)
val fs = FileSystem.get(uri, hadoopConf)