java.lang.illegalstateException:无法返回DataFlow 2.x中的默认编码器



我在数据流2.1 SDK中具有简单的管道。然后,从PubSub读取数据,然后将DOFN应用于其中。

PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));

在此管道上以下错误:

java.lang.illegalstateException:无法返回toespents/parmultido(mydofnclass)的默认编码器.out0 [pcollection]。纠正以下根本原因之一: 没有手动指定编码器;您可以使用.setCoder()这样做。 从编码器中推断编码器失败:无法为com.x.x.model.myclass。

提供编码器

mydofn类如下:

@DefaultCoder(AvroCoder.class)
public class MyClass{
    public long id;
    public HashMap<String,HashSet<String>> a;
    @SerializedName("a")
    public Integer Id;
    @SerializedName("ae")
    public String ae;
}

找到了将implements Serializable添加到MyClass

的解决方案
@DefaultCoder(AvroCoder.class)
public class MyClass implements Serializable {
public long id;
public HashMap<String,HashSet<String>> a;
@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}

以下是一些关于编码器的文档,来自Beam编程指南

Beam SDK需要管道中每个PCollection的编码器。在大多数情况下,Beam SDK能够根据其元素类型或产生其产生的转换的PCollection自动推断编码器,但是,在某些情况下,管道作者将需要明确指定编码器,或开发一个代码器,或为他们的自定义类型。

每个管道对象都有一个编码器对象,该对象将语言类型映射到默认的编码器,该管道应将管道用于这些类型。您可以自己使用编码器来查找给定类型的默认编码器,也可以为给定类型注册新的默认编码器。

转到下面的链接以查看Beam库使用的默认编码器 - https://beam.apache.org/documentation/programmming-guide/#default-coders-and-the-coderredrigistry

如果您在PCollections中使用的对象不在默认编码器内,则可能必须为该对象提供自定义编码器。例如如果您查看pubsubio.write()/pubsubio.read()方法的插入,它们会使用自定义编码器。例如pubsubmessagepayloadonlycoder

假设您将字符串转换为PubSub消息。您可以将此编码器提供给您的PCollection。

PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());

最新更新