我试图在Flink中使用Scala XML库来解析XML,但我无法使其工作。请注意,我需要在同一个处理函数中对代码使用序列化和未序列化(字符串)版本。
我尝试了已经不同的解决方案,它们总是在IntelliJ中工作,但当我在Flink集群上运行它们时就不行了。它们总是返回不同的java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser
;我尝试了多种方法,但仍然会出现与此类似的错误。
这是我的Flink Job的一个例子:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
"""<ciao>ciao</ciao>""",
)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
env.setParallelism(10)
val stream = env.fromCollection(l)
stream
.uid("process")
.map(new Processor)
.print
env.execute("Flink-TEST")
}
}
这是我处理功能的一个例子:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
xml.toString
}
}
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}
最后,这是我的pom.xml,使用maven shade插件制作我传递给flink的jar:
<!-- other sections of the pom are excluded -->
<properties>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
</properties>
<!-- other sections of the pom are excluded -->
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api-scala_2.12</artifactId>
<version>11.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<!-- other sections of the pom are excluded -->
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- other sections of the pom are excluded -->
我相信这个问题在某种程度上与Flink在运行时使用的SAXParser
的实现有关。我还尝试使用@transient
注释来防止Flink中的字段持久化,但没有成功。
然而,我对到底发生了什么感到困惑,有人知道如何防止错误以及出了什么问题吗?
过了一段时间,我发现它出了什么问题。
Scala XML文档称:
在Scala 2.11及更高版本中,将以下内容添加到build.sbt文件库依赖项:
"org.scala-lang.modules" %% "scala-xml" % "1.1.1"
在Maven中翻译为:
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>
这个依赖似乎是不需要的,因为即使Flink 1.7.2似乎使用Scala 2.12.8,它仍然在他的发行版中保留Scala XML(因此在类路径中),我相信这可能会导致实际加载到哪个类的问题,如果正确的话,但这可能不是链接错误的解决方案。
链接错误的解决方案实际上是使用Flink自己的RichMapFunction[InputT, OutputT]
:
class Processor extends RichMapFunction[String, String] {
var factory: SAXParserFactory = _
var SAXParser: SAXParser = _
var xmlLoader: XMLLoader[Elem] = _
override def open(parameters: Configuration): Unit = {
factory = SAXParserFactory.newInstance
SAXParser = factory.newSAXParser
xmlLoader = XML.withSAXParser(SAXParser)
}
override def map(translatedMessage: String): String = {
val xml = xmlLoader.loadString(translatedMessage)
xml.toString
}
}
正如JavaDoc所说:
函数的初始化方法。
它在实际工作方法(如map或join),因此适用于一次性设置工作。对于作为迭代一部分的函数方法将在每个迭代超级步骤开始时调用。
不幸的是,在这种情况下,最好使用var
,因为值/变量的初始化需要由Flink处理,以防止运行时出现链接错误。
一些注意事项:
- 我意识到这可能只发生在
DataStream[T]
上,而不会发生在DataSet[T]
上 - Job需要将并行度设置为大于1才能导致多个任务管理器加载同一个类,如果在IDE中完成,这可能会很棘手,这里已经解释过了
- 在注意到这个问题的原因后,配套对象似乎不适合Flink的使用
- 最后一部分可能是Flink的"Scala API扩展"页面上的一个很好的注释,它还解释了Flink通常不支持匿名模式匹配函数来解构元组,除非使用Flink Scala API扩展:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html