我正在使用简单的 Kafka 阅读器和国家/地区密钥,在 Spark 上运行时出现以下错误
线程"main"中的异常 java.lang.IllegalStateException: 没有为UNBOUNDED transform Read(KafkaUnboundedSource(注册的TransformEvaluator( at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588( at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded(StreamingTransformTranslator.java:560( at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate(SparkRunner.java:451(
我无法获得有关此错误的任何信息
pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of("group.id", (Object) "test1"))
.withoutMetadata())
.apply(ParDo.of(new DoFn<KV<String, String>, MetricData>() {....
我遇到了同样的异常,对我来说(注意:我使用 Apache Beam 和 sbt(,添加以下程序集合并策略解决了这个问题:
assemblyMergeStrategy in assembly ~= { old =>
{
case s if s.endsWith(".properties") => MergeStrategy.filterDistinctLines
case s if s.endsWith("pom.xml") => MergeStrategy.last
case s if s.endsWith(".class") => MergeStrategy.last
case s if s.endsWith(".proto") => MergeStrategy.last
case s if s.endsWith("libjansi.jnilib") => MergeStrategy.last
case s if s.endsWith("jansi.dll") => MergeStrategy.rename
case s if s.endsWith("libjansi.so") => MergeStrategy.rename
case s if s.endsWith("libsnappyjava.jnilib") => MergeStrategy.last
case s if s.endsWith("libsnappyjava.so") => MergeStrategy.last
case s if s.endsWith("snappyjava_snappy.dll") => MergeStrategy.last
case s if s.endsWith(".dtd") => MergeStrategy.rename
case s if s.endsWith(".xsd") => MergeStrategy.rename
case PathList("META-INF", "services", "org.apache.hadoop.fs.FileSystem") =>
MergeStrategy.filterDistinctLines
case s => old(s)
}
因此,您可能想查看官方文档中的"阴影"部分,并将以下内容添加到pom.xml
文件中(我想它还没有。如果我错了,请原谅。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>