Flink Kafka连接器到EventHub



我正在使用apache flink,并尝试通过使用apache kafka协议从中接收消息来连接到Azure EventHub。我设法连接到Azure EventHub并接收消息,但是我无法使用flink功能" setstartfromtimestamp(...)",如下所述(https://ci.apache.org/projects/projects/flink/flink/flink/flink-docs-stable/Dev/connectors/kafka.html#kafka-consumers-start-start-position-configuration)。当我试图从时间戳中获取一些消息时,卡夫卡说,经纪人方面的消息格式在0.10.0之前。有人面对这个吗?Apache Kafka客户端版本是2.0.1Apache Flink版本为1.7.2

更新:尝试在消费者包中添加代码到消费者包中的代码使用时间戳偏移,如果消息版本下的0.10.0 Kafka版本下的消息版本,它将如预期的那样返回NULL。

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);

对不起,我们错过了。现在在EH中支持Kafka Offsetsfortimes()。

将来可以随时针对我们的Github开发一个问题。https://github.com/azure/azure-event-hubs-for-kafka

相关内容

  • 没有找到相关文章

最新更新