我一直在使用Apache Kafka实现ETL
数据管道。我已经使用了Kafka Connect进行提取和负载。
Connect将读取源数据并编写KAFKA主题的实际数据可用。
的形式。在转换阶段,我想从KAFKA主题中读取JSON数据,然后我需要根据某些自定义业务逻辑转换为SQL查询,然后需要写入输出KAFKA主题。
截至目前,我已经写了一个生产者 - 消费者应用程序,该应用程序从主题进行转换,然后写入输出主题。
是否可以使用KAFKA流API实现相同的操作?如果是,请提供一些样本。
查看kafka流或ksql。KSQL在Kafka流的顶部运行,并为您提供了一种非常简单的方法来构建您所谈论的聚合。
这是在KSQL中进行数据流的聚合
的一个示例SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID
请参阅更多信息:https://www.confluent.io/blog/ususe-ksql-to-analyse-query-anlyse--query-and-transform-data-in-kafka
您可以采用KSQL的输出,这实际上只是一个Kafka主题,然后通过Kafka Connect进行流式传输。到Elasticsearch,Cassandra等。