Spark Scala 每个数据集输出为单行数据帧



我在一个目录中有多个.nt(NTriples(文件。我想读取每个数据集并将其各自的输出值存储在一行数据帧中。

假设我有dataset1.nt,dataset2.nt,...,datasetn.nt。 使用以下代码读取每个数据集时:

val input = "src/main/resources/dataset1.nt"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))
//NTripleReader reads .nt file and separates each line of dataset into subject, predicate and object     
/* My code to output number of distinct subjects, predicates and blank subjects in a dataset */

假设 dataset1 给出以下输出:

  • 不同科目数量:xxxx
  • 非重复谓词数:yy
  • 空白科目数:zzz

假设 dataset2 给出以下输出:

  • 不同科目数量:aaaaa
  • 不同谓词的数量:b
  • 空白科目数量:cc

等等...

当我使用以下代码读取目录中的所有文件时:

val input = "src/main/resources/*"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))

它给了我以下输出:

  • 不同主题的数量:xxxx+aaaaa+...//添加每个数据集的所有单独值
  • 不同谓词的数量:yy+b+...
  • 空白科目数量:zzz+cc+...

但是,我希望我的输出是这样的:

Distinct Subjects | Distinct Predicates | Blank Subjects
xxxx              | yy                  | zzz
aaaaa             | b                   | cc    
...               | ...                 | ...

请让我知道如何实现我想要的输出。

提前谢谢。

我正在回答我的问题。我希望这对其他人有所帮助

import java.io.File
//import other necessary packages

object abc {
var df1: DataFrame = _
var df2: DataFrame = _         
var df3: DataFrame = _
def main(args: Array[String]):Unit = 
{
//initializing the spark session locally
val spark = SparkSession.builder
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.appName("abc")
.getOrCreate()
//    creates a list of all files in a directory:
def getListOfFiles(dir: String):List[File] = 
{
val path = new File("path/to/directory/")
if (path.exists && path.isDirectory) 
{
path.listFiles.filter(_.isFile).toList
} 
else 
{
List[File]()
}
}
val files = getListOfFiles("path/to/directory/")
val input = ""
for (input <- files)
{  
//  println(input)
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input.toString()))
/*code to generate dataframe columns value*/
import spark.implicits._
if(input == files(0))
{
df3 = Seq(
(column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")
} 
else
{    
df1 = Seq(
(column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")  
df2 = df3.union(df1)
df3 = df2
}
}
df3.show()
// import dataframe to .csv file
df3.coalesce(1).write
.option("header", "true")
.csv("path/to/directory/sample.csv")
spark.stop
}
}

最新更新