从avro文件中获取spark数据框架列中每一行的数据



我正在尝试处理我的数据框架中的一列,并从avro文件中检索对应于每个条目的度量。

基本上,我想做以下几点:

  1. 在Path列的每一行读取,Path列是avro文件的路径
  2. 读取avro文件作为数据帧& &;获得精度度量,它以Struct
  3. 的形式出现
  4. 创建一个名为Accuracy的新列,其中具有精度度量

这也可以看作是应用了spark.read.format("com.databricks.spark.avro").load(avro_path)而是对于Path列中的每一行。这是我的输入数据帧:

+----------+-----+--------------------------+
|timestamp |Model|         Path             |
+----------+-----+--------------------------+
|11:02     |Vgg  |projects/Vgg/results.avro |
|18:31     |Dnet |projects/Dnet/results.avro|
|15:54     |Rnet |projects/Rnet/results.avro|
|12:19     |ViT  |projects/ViT/results.avro |
+----------+-----+--------------------------+

我希望这是我的输出数据帧:

+----------+-----+--------------------------+-----------+
|timestamp |Model|         Path             | Accuracy  |
+----------+-----+--------------------------+-----------+
|11:02     |Vgg  |projects/Vgg/results.avro |   0.72    | 
|18:31     |Dnet |projects/Dnet/results.avro|   0.78    |
|15:54     |Rnet |projects/Rnet/results.avro|   0.75    |
|12:19     |ViT  |projects/ViT/results.avro |   0.82    |
+----------+-----+--------------------------+-----------+

我试过使用udf,但我猜你不能在udf中加载数据帧。

val get_auc: (String => String) = (avro_path: String) => {

val auc_avro_file = spark.read.format("com.databricks.spark.avro").load(avro_path)
val auc = auc_avro_file.select("metrics.Accuracy").first.toString
auc
}
val auc_udf = udf(get_auc)
val auc_path = models_df.withColumn("Accuracy", auc_udf(col("avro_path")))

错误:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
... 3 more
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:49)
at $anonfun$1.apply(<console>:46)
... 20 more

还有别的办法吗?比如使用map或for循环?

编辑:尝试使用input_file_name基于下面的答案之一:

val paths_col = auc_path.select($"Path")  
val avro_paths = paths_col.withColumn("filename", input_file_name()) 

但是这会给我一个url,指向新列中一个完全不同的avro文件,这不是我想要的。

+----------+-----+--------------------------+------------------------------------+
|timestamp |Model|         Path             |          different_output_Path     |            
+----------+-----+--------------------------+------------------------------------+
|11:02     |Vgg  |projects/Vgg/results.avro |projects/models/all_model_runs.avro |
|18:31     |Dnet |projects/Dnet/results.avro|projects/models/all_model_runs.avro |
|15:54     |Rnet |projects/Rnet/results.avro||projects/models/all_model_runs.avro|
|12:19     |ViT  |projects/ViT/results.avro |projects/models/all_model_runs.avro |
+----------+-----+--------------------------------------------------------------+

我如何仍然得到metrics.Accuracy部分在每个avro文件?

读取avro文件作为数据帧并存储row的路径:

...
val dfWithCol = df.withColumn("filename",input_file_name())
...

然后适当地JOIN。

最新更新