对于数据源,我监听多个主题,当我接收到源事件时,我如何知道该事件来自哪个主题,因为主题名称包含我需要的一些信息(例如国家前缀(
我试图阅读flinkKafkaConsumer的源代码,但没有找到任何可以用来从事件中获取主题名称的方法。
如果您正在收听具有不同名称的不同主题,并且您需要这些信息,那么您可能应该使用一个简单的映射来用所需的信息丰富您的事件。
例如,如果您的主题包含具有字段的事件;用户名";以及";时间戳";
{
"username": "diego",
"timestamp": 1649085282804
}
您可以收听您的来源,并在每个来源之后立即制作地图,以便为每个事件添加所需的信息。
val enrichedDataStream: Datastream[CountryCodeEnrichedEvent] = kafkaSourceCanada.map(event => CountryCodeEnrichedEvent(username: event.username, timestamp: event.timestamp, countryCode: "ca"))
你最终会得到这样的东西,对于你正在听的每一个卡夫卡来源。
{
"username": "diego",
"timestamp": 1649085282804,
"countryCode": "ca"
}
主题是传递给KafkaDeserializationSchema
的deserialize
方法的ConsumerRecord
的一部分。因此,如果您想访问这个或其他Kafka元数据,您需要实现自己的反序列化程序。
https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html