Apache Flink 如何从 Java ObjectNode -> JSON 字符串下沉?



所以这需要JSON字符串 ->Java ObjectNode。

final DataStream<ObjectNode> inputStream = env
.addSource(new RMQSource<ObjectNode>(
connectionConfig,                   // config for the RabbitMQ connection
"start",                            // name of the RabbitMQ queue to consume
true,                               // use correlation ids; can be false if only at-least-once is required
new JSONDeserializationSchema()))   // deserialization schema to turn messages into Java objects
.setParallelism(1);                     // non-parallel source is only required for exactly-once

如何将它们从 Java ObjectNode -> JSON 字符串中放回?

stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new JSONSerializationSchema()
));

JSONSerializationSchema不存在,但我需要这样的东西。

使用如下自定义SerializationSchema

stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new SerializationSchema<ObjectNode>() {
@Override
public byte[] serialize( ObjectNode element ) {
return element.toString().getBytes();
}
}
));

相关内容

  • 没有找到相关文章

最新更新