Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO



我想使用 Flink 使用来自 Kinesis 的 POJO。
对于如何正确发送和反序列化消息,是否有任何标准?

谢谢

我用:

DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
        "my-stream",
        new POJODeserializationSchema(),
        kinesisConsumerConfig));

public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
    private ObjectMapper mapper;
    @Override
    public SamplePojo deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        SamplePojo retVal = mapper.readValue(message, SamplePojo.class);
        return retVal;
    }
    @Override
    public boolean isEndOfStream(SamplePojo nextElement) {
        return false;
    }
}

相关内容

  • 没有找到相关文章

最新更新