我正在使用 kafka 主题中的 XML 文件。谁能告诉我如何将XML解析为数据帧。
val df = sqlContext.read
.format("com.databricks.spark.xml")
//.option("rowTag","ns:header")
// .options(Map("rowTag"->"ntfyTrns:payloadHeader","rowTag"->"ns:header"))
.option("rowTag","ntfyTrnsDt:notifyTransactionDetailsReq")
.load("/home/ubuntu/SourceXML.xml")
df.show
df.printSchema()
df.select(col("ns:header.ns:captureSystem")).show()
我能够从XML中精确地获取信息信息。我不知道如何将 RDD[String] 从 kafka 主题传递或转换或加载到 sql read API。
谢谢!
我面临同样的情况,做一些研究,我发现有些人正在使用此方法使用以下代码将RDD转换为数据帧,如下所示:
val wrapped = rdd.map(xml => s"""<a>$xml</a>""")
val df = new XmlReader().xmlRdd(sqlContext, wrapped)
你只需要从DStream获取RDD,我正在使用pyspark来做到这一点。
streamElement = ssc.textFileStream("s3n://your_path")
streamElement.foreachRDD(process)
其中进程方法具有以下结构,因此您可以使用RDD执行所有操作
def process(time, rdd):
return value