Spark:加载多个文件,执行相同的操作并合并到一个dataFrame中



我有很多小的、单独的.txt文件。对于这些文件中的每一个,我都有多行,用一个空格分为两列,start_time和end_time(一个浮点数(。

我想:

  • 加载所有.txt文件
  • 对于每一行,计算一个包含(end_time-start_time(的新列
  • 为每一行添加一个具有文件名的新列
  • 最后,我想获得一个具有以下模式的dataFrame:
+------------+--------------+------------+------------+
|  file_name |   start_time |   end_time |   duration |
+------------+--------------+------------+------------+

我知道我可以简单地为每个文件和每行创建一个循环,并一次向数据帧添加一行,但我想知道是否有更快的方法
我感兴趣的不是做事的顺序,而是最终结果的速度。我看到SparkContext中提供了textFile((wholeTextFiles((等现有函数,但我不知道如何使用它们来实现我想要的功能。

非常感谢任何指导或建议!

(抱歉我英语不好(

更新:

感谢@Shu的帮助,这是我用来解决问题的最后一个代码

from pyspark.sql.functions import split, reverse, input_file_name
original_schema = [StructField("Start", FloatType(), True),
StructField("End", FloatType(), True)]
data_structure = StructType(original_schema)
df = self.spark_session.read.
csv(path=PATH_FILES+'\*.txt', header=False, schema=data_structure, sep='t').
withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).
withColumn("duration", col("End") - col("Start"))
df.show(20, False)

使用spark.read.csv()读取文件,如果由space分隔的列使用.option("delimiter"," ")

  • 使用input_file_name函数获取文件名

示例:

from pyspark.sql.functions import *
spark.read.option("header",true).
option("delimiter"," ").
csv("<path>").
withColumn("file_name",input_file_name).
withColumn("duration",col("end_time") - col("start_time")).show()

如果行由space分隔,则使用文件中不存在的某些分隔符读取数据。

  • 然后用\s+拆分数据,现在我们将把数据分解成数据帧的行。

  • 使用substring函数提取start_time,end_time并对其进行减法运算以获得持续时间。


spark.read.csv("<file_path>").
withColumn("input",explode(split(col("_c0"),"\s+"))).
withColumn("filename",input_file_name()).
drop("_c0").
show()

UPDATE

Using array index:

spark.read.csv("<file_path>").
withColumn("input",explode(split(col("_c0"),"\s+"))).
withColumn("filename",reverse(split(input_file_name(),'/'))[0]).
drop("_c0").
show()
#or
spark.read.csv("<file_path>").
withColumn("input",explode(split(col("_c0"),"\s+"))).
withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).
drop("_c0").
show()

From Spark-2.4+ Using element_at:

spark.read.csv("<file_path>").
withColumn("input",explode(split(col("_c0"),"\s+"))).
withColumn("filename",element_at(split(input_file_name(),'/'),-1)).
drop("_c0").
show()

Scala中的另一种类似方法-使用spark.read.csv((读取文件,分隔符为空格,文件名命名为(假设火花->火花会话已经存在(

val inputDF = spark.read
.option("inferSchema", "true")
.option("delimiter", " ")
.csv("<path>")
.toDF("start_time","end_time")
val output = inputDF
.withColumn("duration", col("end_time") - col("start_time"))
.withColumn("input_file_name", input_file_name())
.withColumn("file_name_splits", split(col("input_file_name"), "/"))
// Getting the last element from the splits using size function
.withColumn("file_name", col("file_name_splits").apply(size(col("file_name_splits")).minus(1)))
.select("file_name", "start_time", "end_time", "duration")
// To show the sample data
output.show(false)

最新更新