使用Kafka流进行自定义转换



我一直在使用Apache Kafka实现ETL数据管道。我已经使用了Kafka Connect进行提取和负载。

Connect将读取源数据并编写KAFKA主题的实际数据可用。

的形式。

在转换阶段,我想从KAFKA主题中读取JSON数据,然后我需要根据某些自定义业务逻辑转换为SQL查询,然后需要写入输出KAFKA主题。

截至目前,我已经写了一个生产者 - 消费者应用程序,该应用程序从主题进行转换,然后写入输出主题。

是否可以使用KAFKA流API实现相同的操作?如果是,请提供一些样本。

查看kafka流或ksql。KSQL在Kafka流的顶部运行,并为您提供了一种非常简单的方法来构建您所谈论的聚合。

这是在KSQL中进行数据流的聚合

的一个示例
SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID

请参阅更多信息:https://www.confluent.io/blog/ususe-ksql-to-analyse-query-anlyse--query-and-transform-data-in-kafka

您可以采用KSQL的输出,这实际上只是一个Kafka主题,然后通过Kafka Connect进行流式传输。到Elasticsearch,Cassandra等。

相关内容

  • 没有找到相关文章

最新更新