我正在尝试使用Spark结构化流读取KAFKA主题的XML数据。
我尝试使用Databricks spark-xml
软件包,但是我遇到了一个错误,说此软件包不支持流读数。有什么方法可以使用结构化流从Kafka主题中提取XML数据?
我当前的代码:
df = spark
.readStream
.format("kafka")
.format('com.databricks.spark.xml')
.options(rowTag="MainElement")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "test")
.load()
错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
.format("kafka") .format('com.databricks.spark.xml')
最后一个赢得了com.databricks.spark.xml
并成为流源(隐藏Kafka作为源)。
在顺序的单词中,以上等效于.format('com.databricks.spark.xml')
。
正如您所经历的那样,Databricks spark-xml
软件包不支持流读(即不能充当流源)。该软件包不是用于流式传输的。
有什么办法可以使用结构化流来从KAFKA主题中提取XML数据?
您将使用标准功能或UDF访问和处理XML。在结构化流中,没有内置支持XML处理至Spark 2.2.0。
无论如何,这应该没什么大不了的。Scala代码看起来如下。
val input = spark.
readStream.
format("kafka").
...
load
val values = input.select('value cast "string")
val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
// print XMLs and numbers to the stdout
val q = numbersFromXML.
writeStream.
format("console").
start
另一个可能的解决方案可能是编写您自己的自定义流源,该源将在def getBatch(start: Option[Offset], end: Offset): DataFrame
中处理XML格式。应该工作。
import xml.etree.ElementTree as ET
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "test")
.load()
然后我写了python udf
def parse(s):
xml = ET.fromstring(s)
ns = {'real_person': 'http://people.example.com',
'role': 'http://characters.example.com'}
actor_el = xml.find("DNmS:actor",ns)
if(actor_el ):
actor = actor_el.text
role_el.find('real_person:role', ns)
if(role_el):
role = role_el.text
return actor+"|"+role
注册此udf
extractValuesFromXML = udf(parse)
XML_DF= df .withColumn("mergedCol",extractroot("value"))
AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\|").getItem(0))
.withColumn("Role", split(col("mergedCol"), "\|").getItem(1))
您不能以这种方式混合格式。KAFKA源以Row
的身份加载,其中包括key
,value
和topic
等值数量,value
列存储有效载荷作为binary
类型:
请注意,无法设置以下Kafka参数,而Kafka源或接收器将引发例外:
...
value.deserializer :用ByTearrayDeserializer始终为字节阵列进行了典当化。使用DataFrame操作明确估算值。
解析此内容是用户责任,不能委派给其他数据源。例如,请参见我对如何使用结构化流的Kafka读取JSON格式记录的答案?
对于XML,您可能需要一个UDF(UserDefinedFunction
),尽管您可以首先尝试Hive XPath功能。您还应该解码二进制数据。
看起来上述方法有效,但它没有使用传递的模式来解析XML文档。
如果您打印关系模式,它始终是
INFO XmlToAvroConverter - .convert() : XmlRelation Schema ={} root
|-- fields: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- nullable: boolean (nullable = true)
| | |-- type: string (nullable = true)
|-- type: string (nullable = true)
for ex:我正在流式传输XML文档的Kafka主题
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<Book>
<Author>John Doe</Author>
<Title>Test</Title>
<PubishedDate></PublishedDate>
</Book>
这是我必须将XML解析到数据框中的代码
kafkaValueAsStringDF = kafakDF.selectExpr("CAST(key AS STRING) msgKey","CAST(value AS STRING) xmlString")
var parameters = collection.mutable.Map.empty[String, String]
parameters.put("rowTag", "Book")
kafkaValueAsStringDF.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
val xmlStringDF:DataFrame = batchDF.selectExpr("xmlString")
xmlStringDF.printSchema()
val rdd: RDD[String] = xmlStringDF.as[String].rdd
val relation = XmlRelation(
() => rdd,
None,
parameters.toMap,
xmlSchema)(spark.sqlContext)
logger.info(".convert() : XmlRelation Schema ={} "+relation.schema.treeString)
}
.start()
.awaitTermination()
当我从文件系统或S3读取相同的XML文档并使用SPARK-XML并按照预期进行架构。
谢谢sateesh
您可以使用SQL内置函数xpath
,也可以从Kafka消息的 value 中提取数据。/p>
给定一个嵌套的XML
<root>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S</FilterClass>
<InputData>
<Finance>
<HeaderSegment>
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
</Finance>
</InputData>
</root>
然后,您可以在selectExpr
Statment中使用这些SQL功能如下:
df.readStream.format("kafka").options(...).load()
.selectExpr("CAST(value AS STRING) as value")
.selectExpr(
"xpath(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
"xpath_long(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
"xpath_string(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
"xpath_int(value, '/CofiResults/InputData/Finance/HeaderSegment/Version/text()') as VersionAsInt")
请记住,xpath
函数将返回字符串的数组,而您可能会发现将值提取为字符串甚至长度更方便。使用SPARK 3.0.1使用控制台接收器流中以上代码将导致:
+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839] |20201103153839 |20201103153839 |6 |
+-------------------------+-------------------+---------------------+------------+