如何在没有架构的情况下监视写入 kafka 主题的'bad'消息



我使用Kafka Connect从RabbitMQ获取数据到Kafka主题。数据没有模式,所以为了关联模式,我使用ksql流。在流的顶部,我创建了一个新的主题,它现在有一个已定义的模式。最后将数据录入BQ数据库。我的问题是如何监视未通过流阶段的消息?这样,我是否支持模式进化?如果没有,如何使用模式注册表功能?

感谢

使用Kafka Connect获取数据…数据没有模式

我不是特别熟悉Rabbitmq连接器,但如果你使用Confluent转换器类使用模式,那么它会有一个,虽然可能只有一个字符串或字节模式


如果ksql正在使用非模式主题,则有一个与该进程相关联的消费者组。您可以监视它的延迟,以了解ksql还没有处理多少消息。如果ksql无法解析消息,因为它是"坏"的,那么我认为它要么被跳过,要么流完全停止消费;这可能是可配置的

例如,如果您将输出主题格式设置为Avro,那么模式将自动注册到Registry。除非您修改流 的字段,否则不会有演化。

最新更新