Kafka 用例:从 Kafka(可能是流)连续读取,使用 Java 方法对消息值执行解密,然后插入到 db(接收器连接器或我有一个 Java REST API,如果需要可以插入到数据库)
我还没有看到这个用例的任何可行的解决方案,因为:
-
我无法在 kafka 中存储解密的数据。
-
我需要在插入数据库之前执行解密步骤。
看看Kafka Connect和Kafka Streams来解决用例:
我似乎无法使用 Kafka Connect,因为我不明白如何包含解密步骤。
我似乎不能使用 Kafka 流,因为 1。它是为读取和写回主题而构建的。2. 即使我使用处理器 API 实现自定义逻辑,我也看不到如何将KStream
消息值转换为可以传递给我的 Java 解密方法或数据库的String
。
我有一个解决方案,它在 Java 应用程序中使用常规的 Kafka 消费者,但它作为 1 次批处理作业执行,我需要一个长期存在并持续检查 Kafka 队列的应用程序。 我可以无限循环批处理作业,但我不确定这是一个可行的选择。
如何完成此用例?Spark不是我们的选择。
您应该使用 Kafka Connect 并通过实现Transformation
接口来编写自定义 SMT:
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
查看此讲座了解更多详情: https://www.confluent.io/thank-you/single-message-transformations-not-transformations-youre-looking/