Apache beam Kafka [ No TransformEvaluator registered for UNB



我正在使用简单的 Kafka 阅读器和国家/地区密钥,在 Spark 上运行时出现以下错误

线程"main"中的异常 java.lang.IllegalStateException: 没有为UNBOUNDED transform Read(KafkaUnboundedSource(注册的TransformEvaluator( at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588( at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded(StreamingTransformTranslator.java:560( at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate(SparkRunner.java:451(

我无法获得有关此错误的任何信息

pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of("group.id", (Object) "test1"))
.withoutMetadata())

.apply(ParDo.of(new DoFn<KV<String, String>, MetricData>() {....

我遇到了同样的异常,对我来说(注意:我使用 Apache Beam 和 sbt(,添加以下程序集合并策略解决了这个问题:

assemblyMergeStrategy in assembly ~= { old =>
{
case s if s.endsWith(".properties")           => MergeStrategy.filterDistinctLines
case s if s.endsWith("pom.xml")               => MergeStrategy.last
case s if s.endsWith(".class")                => MergeStrategy.last
case s if s.endsWith(".proto")                => MergeStrategy.last
case s if s.endsWith("libjansi.jnilib")       => MergeStrategy.last
case s if s.endsWith("jansi.dll")             => MergeStrategy.rename
case s if s.endsWith("libjansi.so")           => MergeStrategy.rename
case s if s.endsWith("libsnappyjava.jnilib")  => MergeStrategy.last
case s if s.endsWith("libsnappyjava.so")      => MergeStrategy.last
case s if s.endsWith("snappyjava_snappy.dll") => MergeStrategy.last
case s if s.endsWith(".dtd")                  => MergeStrategy.rename
case s if s.endsWith(".xsd")                  => MergeStrategy.rename
case PathList("META-INF", "services", "org.apache.hadoop.fs.FileSystem") =>
MergeStrategy.filterDistinctLines
case s => old(s)
}

因此,您可能想查看官方文档中的"阴影"部分,并将以下内容添加到pom.xml文件中(我想它还没有。如果我错了,请原谅。

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

相关内容

最新更新