我有一个用例来读取和处理目录中的所有文件,并在spark中应用一些转换后创建单独的输出文件。在这里,我想在spark中执行并行处理,以将所有所需的转换应用于登录目录中当前存在的文件。
下面是我尝试过的示例代码,但它不起作用。
def fileList = .... //to fetch file names from a directory
def businessLogic() //where I am doing all the operations. (read from a file, transformation, etc)
fileList().map(businessLogic) // calling business logic in parallel
你能告诉我如何实现并行处理吗?
注意:文件数量可以是N,将所有文件读取到数据帧中不是一个选项,因为我必须为每个文件创建输出文件,触发多个火花作业也不是一个选择。
谢谢,Sourav
这里有一个例子,基本上就是斯里尼瓦斯在评论中建议的。
这里的关键是函数input_file_name
,它提供原始文件名。
请注意,如果fileList
是标准(非分布式(数据结构(如DataFrame/Dataset/RDD(,则像forEach
这样的操作和转换是并行执行的。如果你想使用原生Scala来实现并行执行,你可以看看Futures。
// spark is a SparkSession
// input_file_name is spark.sql.functions.input_file_name
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/data")
val df2 = df.withColumn("input", substring_index(input_file_name(), "/", -1))
df2.write.partitionBy("input").option("header", "true").csv("output")
也许是这样的?
val combinedDF = fileList.map { name =>
spark.read.whatever(name).withColumn("file_name", lit(name)
}.reduce { _ union _ }
val result = applyBusinessLogic(combinedDF).partitionBy("file_name").persist
fileList.foreach { name =>
createOutput(name, result.filter(col('file_name') === name)
}