如何从kafka读取XML格式的流数据



我正在尝试使用Spark结构化流读取KAFKA主题的XML数据。

我尝试使用Databricks spark-xml软件包,但是我遇到了一个错误,说此软件包不支持流读数。有什么方法可以使用结构化流从Kafka主题中提取XML数据?

我当前的代码:

df = spark 
      .readStream 
      .format("kafka") 
      .format('com.databricks.spark.xml') 
      .options(rowTag="MainElement")
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option(subscribeType, "test") 
      .load()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
.format("kafka") 
.format('com.databricks.spark.xml') 

最后一个赢得了com.databricks.spark.xml并成为流源(隐藏Kafka作为源)。

在顺序的单词中,以上等效于.format('com.databricks.spark.xml')

正如您所经历的那样,Databricks spark-xml软件包不支持流读(即不能充当流源)。该软件包不是用于流式传输的。

有什么办法可以使用结构化流来从KAFKA主题中提取XML数据?

您将使用标准功能或UDF访问和处理XML。在结构化流中,没有内置支持XML处理至Spark 2.2.0。

无论如何,这应该没什么大不了的。Scala代码看起来如下。

val input = spark.
  readStream.
  format("kafka").
  ...
  load
val values = input.select('value cast "string")
val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start

另一个可能的解决方案可能是编写您自己的自定义流源,该源将在def getBatch(start: Option[Offset], end: Offset): DataFrame中处理XML格式。应该工作。

import xml.etree.ElementTree as ET
df = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option(subscribeType, "test") 
      .load()

然后我写了python udf

def parse(s):
  xml = ET.fromstring(s)
  ns = {'real_person': 'http://people.example.com',
      'role': 'http://characters.example.com'}
  actor_el = xml.find("DNmS:actor",ns)
  if(actor_el ):
       actor = actor_el.text
  role_el.find('real_person:role', ns)
  if(role_el):
       role = role_el.text
  return actor+"|"+role

注册此udf

extractValuesFromXML = udf(parse)
   XML_DF= df .withColumn("mergedCol",extractroot("value"))
   AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\|").getItem(0))
        .withColumn("Role", split(col("mergedCol"), "\|").getItem(1))

您不能以这种方式混合格式。KAFKA源以Row的身份加载,其中包括keyvaluetopic等值数量,value列存储有效载荷作为binary类型:

请注意,无法设置以下Kafka参数,而Kafka源或接收器将引发例外:

...

value.deserializer :用ByTearrayDeserializer始终为字节阵列进行了典当化。使用DataFrame操作明确估算值。

解析此内容是用户责任,不能委派给其他数据源。例如,请参见我对如何使用结构化流的Kafka读取JSON格式记录的答案?

对于XML,您可能需要一个UDF(UserDefinedFunction),尽管您可以首先尝试Hive XPath功能。您还应该解码二进制数据。

看起来上述方法有效,但它没有使用传递的模式来解析XML文档。

如果您打印关系模式,它始终是

INFO  XmlToAvroConverter - .convert() : XmlRelation Schema ={} root
 |-- fields: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- nullable: boolean (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)

for ex:我正在流式传输XML文档的Kafka主题

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<Book>
<Author>John Doe</Author>
<Title>Test</Title>
<PubishedDate></PublishedDate>
</Book>

这是我必须将XML解析到数据框中的代码

kafkaValueAsStringDF = kafakDF.selectExpr("CAST(key AS STRING) msgKey","CAST(value AS STRING) xmlString")
  var parameters = collection.mutable.Map.empty[String, String]
  parameters.put("rowTag", "Book")
kafkaValueAsStringDF.writeStream.foreachBatch {
          (batchDF: DataFrame, batchId: Long) =>
 val xmlStringDF:DataFrame = batchDF.selectExpr("xmlString")
            xmlStringDF.printSchema()
            val rdd: RDD[String] = xmlStringDF.as[String].rdd

            val relation = XmlRelation(
              () => rdd,
              None,
              parameters.toMap,
              xmlSchema)(spark.sqlContext)

            logger.info(".convert() : XmlRelation Schema ={} "+relation.schema.treeString)
}
        .start()
        .awaitTermination()

当我从文件系统或S3读取相同的XML文档并使用SPARK-XML并按照预期进行架构。

谢谢sateesh

您可以使用SQL内置函数xpath,也可以从Kafka消息的 value 中提取数据。/p>

给定一个嵌套的XML

<root>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <Finance>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </Finance>
  </InputData>
</root>

然后,您可以在selectExpr Statment中使用这些SQL功能如下:

df.readStream.format("kafka").options(...).load()
  .selectExpr("CAST(value AS STRING) as value")
  .selectExpr(
    "xpath(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(value, '/CofiResults/InputData/Finance/HeaderSegment/Version/text()') as VersionAsInt")

请记住,xpath函数将返回字符串的数组,而您可能会发现将值提取为字符串甚至长度更方便。使用SPARK 3.0.1使用控制台接收器流中以上代码将导致:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+

相关内容

  • 没有找到相关文章

最新更新