嘿,我有一个典型的要求,其中我必须在循环中创建带有Scala中字符串列表的自定义名称的循环范围。
就像我有字符串say的列表(产品,客户,订单,......),并且此列表可以有n个项目数,n可以在其中任何数字说直到30。在另一个文件中指定列表中的列表。
因此,对于列表中的每个项目,例如产品,我必须创建数据框架名称为产品,然后在以后我需要编写SPARK SQL,将其加入以下列表中的所有项目。
选择product.name,customer.name,order.name从产品加入客户...加入订单...
此加入查询将根据列表中的项目数量动态。我正在考虑从shell脚本创建.scala文件。让我知道您的建议。
也可以从Scala对象创建动态SQL。基于用户输入,首先创建一个RDD。然后根据您的要求创建预期对象的列表,并创建数据框架和对象名称的地图。然后使用循环生成SQL字符串。
嘿,我通过创建一种称为"生成f"的方法来实现这一目标。这将文件列表作为字符串(","分离)和带有架构定义(",")的文件分开,最后是包含数据的文件。
def generateDF(fName: String, schemaFile: String, dataFile: String): Unit = {
// Reading the prod files and creating DataFrame from user defined schema
val SchemaRDD = spark.sparkContext.textFile(schemaFile)
val SchemaString = SchemaRDD.map(_.toString).collect().mkString
val Schema = StructType(SchemaString.split(",").map(column => StructField(column.split(":")(0), inferType(column), true)))
val outDF = spark.read.format("csv")
.option("delimiter", ",").option("quote", "")
.option("header", "false")
.schema(Schema)
.load(dataFile)
outDF.createTempView(fName)
}
//调用源文件中每个表名称的过程
fileListRDD
.flatMap(_.split(",")).collect.toList
.map(file => generateDF(file.mkString.toString, (filePath + file.mkString + ".schema"), (filePath + file.mkString + ".csv")))