如何从udf调用FileSystem



我的期望

目标是为每个DataFrame行添加一个具有修改时间的列。

给定

val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())
+----+------------------------+
| id |        input_file_name |
+----+------------------------+
|  1 | hdfs://path/part-00001 |
|  2 | hdfs://path/part-00001 |
|  3 | hdfs://path/part-00002 |
+----+------------------------+

预期

+----+------------------------+
| id |      modification_time |
+----+------------------------+
|  1 | 2000-01-01Z00:00+00:00 |
|  2 | 2000-01-01Z00:00+00:00 |
|  3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+

我尝试了什么

我写了一个函数来获取修改时间

def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")

修改时间:长=1580708401253

。。。但它在查询中不起作用

def input_file_modification_time = udf((path: String) => getModificationTime(path))
data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)

org.apache.spark.SparkException:由于阶段失败而中止作业:阶段54.0中的任务0失败4次,最近的失败:阶段54.0中丢失任务0.3(TID 408,srs-hdp-s1.dev.kontur.ru,executor 3(:org.apache.spak.SparkException:无法执行用户定义的函数($anonfun$input_file_modification_time$1:(string(=>bigint(

问题是spark在UDF中为null,因为它只存在于驱动程序上。另一个问题是hadoopsConfiguration是不可序列化的,因此您无法轻松地将其包含在udf中。但是有一个使用org.apache.spark.SerializableWritable:的解决方案

import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration
val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)
def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
org.apache.hadoop.fs.FileSystem.get(conf.value)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))
data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)

Note为DataFrame的每一行调用getModificationTime将对性能产生影响。

修改您的代码以一次性获取文件元数据&存储在files:Map[String,Long]中,创建UDFinput_file_modification_time以从Map[String,Long]获取数据。

请检查以下代码。

scala> val df = spark.read.format("parquet").load("/tmp/par")
df: org.apache.spark.sql.DataFrame = [id: int]
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
// Exiting paste mode, now interpreting.
getModificationTime: (path: String)Long
scala> implicit val files = df.inputFiles.flatMap(name => Map(name -> getModificationTime(name))).toMap
files: scala.collection.immutable.Map[String,Long] = Map(file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_2.snappy.parquet -> 1588080295000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_3.snappy.parquet -> 1588080299000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_4.snappy.parquet -> 1588080302000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000.snappy.parquet -> 1588071322000)
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getTime(fileName:String)(implicit files: Map[String,Long]): Long = {
files.getOrElse(fileName,0L)
}
// Exiting paste mode, now interpreting.
getTime: (fileName: String)(implicit files: Map[String,Long])Long
scala> val input_file_modification_time = udf(getTime _)
input_file_modification_time: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
scala> df.withColumn("createdDate",input_file_modification_time(input_file_name)).show
+---+-------------+
| id|  createdDate|
+---+-------------+
|  1|1588080295000|
|  2|1588080295000|
|  3|1588080295000|
|  4|1588080295000|
|  5|1588080295000|
|  6|1588080295000|
|  7|1588080295000|
|  8|1588080295000|
|  9|1588080295000|
| 10|1588080295000|
| 11|1588080295000|
| 12|1588080295000|
| 13|1588080295000|
| 14|1588080295000|
| 15|1588080295000|
| 16|1588080295000|
| 17|1588080295000|
| 18|1588080295000|
| 19|1588080295000|
| 20|1588080295000|
+---+-------------+
only showing top 20 rows

scala>

您所需要的只是系统上的一些路径(s3://(

val pathOnTheSystem = "s3://some_path"
val hadoopConf = new Configuration()
val uri = new URI(pathOnTheSystem)
val fs = FileSystem.get(uri, hadoopConf)

最新更新