我尝试为我的自定义类实现一种方法,并使用Flink Kafka Connector在Kafka上产生数据。类原型为以下:
public class StreamData implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
...
将数据写入特定Kafka主题的方法类似于:
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
"localhost:9092",
id,
new KeyedSerializationSchema<byte[]>() {
@Override
public byte[] serializeKey(byte[] bytes) {
return bytes;
}
@Override
public byte[] serializeValue(byte[] bytes) {
return bytes;
}
@Override
public String getTargetTopic(byte[] bytes) {
return null;
}
});
data.addSink(producer);
}
我还有另一种方法可以将数据从KAFKA主题获取到 data
提交的对象,这可以正常运行。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题,我收到了错误:
org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable
主代码:
StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");
似乎Java试图序列化对象,而不仅仅是字段data
!测试了用于使用Flink Kafka连接器向Kafka生产数据的代码,并在常规使用情况下使用(我的意思是不使用类并在Main中编写所有代码(
我如何消失错误?
我相信问题的原因是您的代码正在这样做:
new KeyedSerializationSchema<byte[]>() {...}
该代码的作用是创建一个键入的匿名子类,作为定义类的内部类(StreamData(。每个内部类都具有对外部类实例的隐式引用,因此使用默认Java序列化规则序列化它也将试图序列化外部对象(StreamData(。解决此问题的最佳方法是将您的钥匙内序列化chema子声明为:
:- 新的顶级课程或
- 一个新的静态嵌套类或
- 一个没有封闭实例的匿名内部类,这意味着将其定义为父类的静态字段。
我认为的最后一种方法看起来像这样:
public class StreamData {
static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
...
};
...
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);
data.addSink(producer);
}
}
您也可以在flink中进行序列化
dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new
CustomSerializerSchema(),properties));
public class CustomSerializerSchema implements SerializationSchema<MyUser> {
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(MyUser element) {
byte[] b = null;
try {
b= new ObjectMapper().writeValueAsBytes(element);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return b;
}
}