无法使用带有FlinkRunner的HBaseIO为org.apache.hdoop.hbase.client.Muta



我在FlinkRunner中使用HbaseIO时遇到了"无法为org.apache.hoop.hbase.client.Mutation提供编码器"的问题。例外情况如下:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for ParDo(HBaseProfile)/ParMultiDo(HBaseProfile).output [PCollection]. Correct one of the following root causes:
No Coder has been manually specified;  you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.hadoop.hbase.client.Mutation.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at ac.cn.iie.process.ProfileProcess.process(ProfileProcess.java:91)
at ac.cn.iie.Bootstrap.main(Bootstrap.java:25)

我已经使用maven shade插件来包装我的jar:

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>ac.cn.iie.Bootstrap</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>org.codehaus.plexus.util</pattern>
<shadedPattern>org.shaded.plexus.util</shadedPattern>
<excludes>
<exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
<exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>

有人知道原因吗

我已经通过手动注册Beam CoderProvider解决了我的问题。代码如下:

CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderProvider(
new HBaseCoderProviderRegistrar().getCoderProviders().get(0));

HBaseCoderProviderRegistor((通过getCoderProviders((方法提供CoderProvider列表,列表的0索引为HBaseMutationCoder。因为我的代码使用HBaseIO.write((,只需要注册突变编码器。

请注意,HBaseCoderProviderRegistrar已经为突变类型自动注册HBaseMutationCoder

使用maven shade插件而不处理输出jar中META-INF/中包含的服务文件是用户常见的陷阱。您想将ServicesSourceTransformer添加到您的变压器列表中,如下所示:

<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

有关更多详细信息,请参阅Apache文档和ServiceLoader javadoc。在StackOverflow上还有许多其他类似的问题。

使POJO类可序列化。公共类CustomerEntity实现Serializable{

private String id;
private String name;

@K Fred的回答对我有帮助。但我也发现,只需将所有的apache.beam依赖项更新到最新版本也能解决问题。

相关内容

最新更新