Flink Kafka生产者:类的对象不可序列化



我尝试为我的自定义类实现一种方法,并使用Flink Kafka Connector在Kafka上产生数据。类原型为以下:

public class StreamData implements Serializable {
    private transient StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    ...

将数据写入特定Kafka主题的方法类似于:

public void writeDataIntoESB(String id) throws Exception {
        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
                "localhost:9092",
                id,
                new KeyedSerializationSchema<byte[]>() {
                    @Override
                    public byte[] serializeKey(byte[] bytes) {
                        return bytes;
                    }
                    @Override
                    public byte[] serializeValue(byte[] bytes) {
                        return bytes;
                    }
                    @Override
                    public String getTargetTopic(byte[] bytes) {
                        return null;
                    }
                });               
        data.addSink(producer);
    }

我还有另一种方法可以将数据从KAFKA主题获取到 data提交的对象,这可以正常运行。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题,我收到了错误:

org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable

主代码:

StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");

似乎Java试图序列化对象,而不仅仅是字段data!测试了用于使用Flink Kafka连接器向Kafka生产数据的代码,并在常规使用情况下使用(我的意思是不使用类并在Main中编写所有代码(

(

我如何消失错误?

我相信问题的原因是您的代码正在这样做:

new KeyedSerializationSchema<byte[]>() {...}

该代码的作用是创建一个键入的匿名子类,作为定义类的内部类(StreamData(。每个内部类都具有对外部类实例的隐式引用,因此使用默认Java序列化规则序列化它也将试图序列化外部对象(StreamData(。解决此问题的最佳方法是将您的钥匙内序列化chema子声明为:

  • 新的顶级课程或
  • 一个新的静态嵌套类或
  • 一个没有封闭实例的匿名内部类,这意味着将其定义为父类的静态字段。

我认为的最后一种方法看起来像这样:

public class StreamData {
    static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
        ...
    };
    ...
    public void writeDataIntoESB(String id) throws Exception {
        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);               
        data.addSink(producer);
    }
}

您也可以在flink中进行序列化

dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new 
                                             CustomSerializerSchema(),properties));

  public class CustomSerializerSchema implements SerializationSchema<MyUser> {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] serialize(MyUser element) {
        byte[] b = null;
         try {
             b= new ObjectMapper().writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return b; 
    }
}

相关内容

  • 没有找到相关文章

最新更新