如何在不读取单个数据帧火花的情况下并行处理目录中的所有文件



我有一个用例来读取和处理目录中的所有文件,并在spark中应用一些转换后创建单独的输出文件。在这里,我想在spark中执行并行处理,以将所有所需的转换应用于登录目录中当前存在的文件。

下面是我尝试过的示例代码,但它不起作用。

def fileList = .... //to fetch file names from a directory
def businessLogic() //where I am doing all the operations. (read from a file, transformation, etc)
fileList().map(businessLogic) // calling business logic in parallel

你能告诉我如何实现并行处理吗?

注意:文件数量可以是N,将所有文件读取到数据帧中不是一个选项,因为我必须为每个文件创建输出文件,触发多个火花作业也不是一个选择。

谢谢,Sourav

这里有一个例子,基本上就是斯里尼瓦斯在评论中建议的。

这里的关键是函数input_file_name,它提供原始文件名。

请注意,如果fileList是标准(非分布式(数据结构(如DataFrame/Dataset/RDD(,则像forEach这样的操作和转换是并行执行的。如果你想使用原生Scala来实现并行执行,你可以看看Futures。

// spark is a SparkSession
// input_file_name is spark.sql.functions.input_file_name
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/data")
val df2 = df.withColumn("input", substring_index(input_file_name(), "/", -1))
df2.write.partitionBy("input").option("header", "true").csv("output")

也许是这样的?

val combinedDF = fileList.map { name => 
spark.read.whatever(name).withColumn("file_name", lit(name)
}.reduce { _ union _ }
val result = applyBusinessLogic(combinedDF).partitionBy("file_name").persist

fileList.foreach { name => 
createOutput(name, result.filter(col('file_name') === name)
}

最新更新