我正在尝试使用Scala中的Xml load解析Xml文件。但是该程序无法从HDFS读取输入数据。它只能从本地文件系统读取。
有人可以帮助我如何从HDFS读取输入数据。
尝试了以下程序:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.WrappedArray
import scala.collection.immutable.HashMap
import scala.collection.immutable.HashMap
import scala.collection.immutable.HashMap
object ProcessxmlInputFiles {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("Parse XML Data").setMaster("local[*]"))
val rawRDD = xml.XML.load(args(0))
rawRDD.child.foreach { x =>
var dateTime = StringBuilder.newBuilder
x.child.foreach { x =>
if ("header".equals(x.label)) {
dateTime.append(x.child(1).attribute("dateTime").get.toString())
}
...
}
...
}
...
sc.stop
}
}
提前感谢!!
您可以将 databricks lib 用于 xml
//imports
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType};
/define schema for xml
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("column1", StringType, nullable = true),
StructField("column2", StringType, nullable = true)))
//read xml file
val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "item")
.schema(customSchema)
.load("file.xml") //You can provide local file "file:///<path to your xml>" for hdfs "hdfs://<path to file>"
//write the result
val selectedData = df.select("column1", "_id")
selectedData.write
.format("com.databricks.spark.xml")
.option("rootTag", "items")
.option("rowTag", "item")
.save("newfile.xml")