我是Flink和集群计算的新手。我花了一整天的时间试图在 Flink 上正确解析来自 Kafka 的愚蠢流,但没有结果:这有点令人沮丧...... 我在 kafka 中有一股用字符串键标识的 JSON-LD 消息流。我只想在 Flink 中检索它们,然后用不同的键分隔消息。
1) 最初,我考虑将消息作为字符串而不是JSON-LD发送。我虽然更容易...
我尝试了所有反序列化器,但没有一个有效。简单的反序列化器可以正常工作,但它完全忽略了键。
我相信我必须使用(Flink 显然只有两个支持键的反序列化器):
DataStream<Object> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
.rebalance();
stream.print();
但我得到:
2017-06-12 02:09:12 来源:自定义源(4/4)切换到失败 java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
如何在不丢失密钥的情况下接收流消息?
2) 我的 kafka 生产者是用 javascript 实现的,因为 Flink 支持 JSONDeserialization,虽然直接发送 kafka JSON 对象。 我不确定这是否适用于 JSON-LD,但我使用了:
json.parse(jsonld_message)
将消息序列化为 json。然后我用通常的字符串键发送了这个。
但是在 Flink 中,这段代码不起作用:
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
.rebalance();
stream.print();
提高一个
JsonParserException.
我认为第一种方法更简单,我更喜欢它,因为允许一次考虑一个问题(第一:接收数据,第二:我想使用外部库重新转换 JSON-LD 中的字符串)。
已求解:
最后,我决定实现一个自定义反序列化程序来实现KeyedDeserializedSchema接口。
为了使用 Flink 的TypeInformationKeyValueSerializationSchema
从 Kafka 读取数据,它必须以兼容的方式编写。假设你的键和值是String
型的,那么键和值必须以 Flink 的StringSerializer
理解数据的方式编写。
因此,您必须确保 Kafka 生产者以兼容的方式写入数据。否则 Flink' 将无法读取数据。
** 我遇到了类似的问题。理想情况下,类型信息键值序列化模式与键和值的字符串类型应该能够读取我的 kafka 记录,该记录将键和值作为字符串。但它无法并且如上述帖子所指出的那样有一个EOF例外。因此,此问题很容易重现,需要修复。请让我知道我是否可以在此过程中提供任何帮助。同时,使用
Kafka 反序列化程序架构
.这是代码,因为几乎没有关于它的文档来读取键/值和其他内容: **
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class CustomKafkaSerializer implements KafkaDeserializationSchema<Tuple2<String,String>> {
@Override
public boolean isEndOfStream(Tuple2<String,String> stringStringPair) {
return false;
}
@Override
public Tuple2<String,String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
String key = new String(consumerRecord.key());
String value = new String(consumerRecord.value());
return new Tuple2<>(key,value);
}
@Override
public TypeInformation<Tuple2<String,String>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<String, String>>(){});
}
}