我有两组文件和Schema,我想在一组代码中运行它们。
这是我的代码:
val file_path = "file1" // I want to pass two files (file1, file2)
val rdd = spark.sparkContext.wholeTextFiles(file_path)
val validJsonRdd = rdd.flatMap(_._2.replace(" ", "").replace("n", "").replace(":value", ":"value"").replace("}{", "}n{").split("n"))
val dataframe = spark
.read
.option("multiLine", true)
.schema(Schema1) // I want to put schema1 for file1 and schema2 for file2
.json(validJsonRdd)
.show()
因此,根据上面的代码,我想运行两个不同的模式及其对应的文件。
您可以将应用程序参数传递到spark-submit中,如下所示。。
spark-submit 的通用语法
./bin/spark-submit
--class <main-class>
--master <master-url>
--deploy-mode <deploy-mode>
--conf <key>=<value>
... # other options
<application-jar>
[application-arguments]
您可以使用file.txtschema1.txt(OR(file1.txtscehma2.txtTR来代替[应用程序参数]
file.txt是第一个参数,schema1.txt是第二个参数
在你的应用程序代码中,你可以使用类似的东西
def main(args: Array[String]) : Unit = {
val inputFile = args(0);
val schemaFile = args(1);
val schemaFileasString = // open FileInoutStream and read whole schema data from **schemaFile** as string
val schema = SchemaConverter.convertContent(schemaFileasString)
//create spark session and provide all the parameter properly
import spark.implicits._
val rdd = spark.sparkContext.wholeTextFiles(inputFile)
val validJsonRdd = rdd.flatMap(_._2.replace(" ", "").replace("n", "").replace(":value", ":"value"").replace("}{", "}n{").split("n"))
spark.read
.option("multiLine", true)
.schema(schema)
.json(validJsonRdd)
.show()
}
参考文献:
https://github.com/zalando-incubator/spark-json-schema
https://spark.apache.org/docs/latest/submitting-applications.html