问题:我在 kafka 主题中有记录,需要使用来自 MySQL 数据库(以及其他数据库(的元数据进行扩充。使用 Flink,可以实现丰富的映射器,因此一个连接可以重用于多个记录。
在 kafka 流 (java( 中是否有类似的功能?
随机想法:到目前为止,我发现了以下变体:
- 映射器 (lambda(:为每个记录创建一个新实例...
- 转换器/处理器:用于有状态操作(太好了!(,但也意味着使用状态存储(在我的用例中不需要(
我错过了什么?
注意:我也考虑过 kafka-connect,但我需要在两个 kafka 主题之间转换数据,而不是在外部系统之间转换数据......
这正是Matthias J. Sax在他的回答中所说的:Processor
和Transformer
可以是无国籍的,也可以是有状态的。
作为参考,我还要指出Confluent的Kafka Streams API文档中的以下片段(http://docs.confluent.io/3.2.0/streams/developer-guide.html#processor-api 介绍(
处理器 API 可用于实现无状态操作和有状态操作,后者是通过使用状态存储来实现的。
还有一个演示应用程序可以实现无状态Transformer
:https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java
上面的示例(confluentinc/examples的分支3.2.x
(适用于Confluent 3.2.0和Apache Kafka 0.10.2.0。
您也可以在没有状态的情况下使用 transfrom()/process()
- 状态是可选的。因此,这应该为您提供所需的内容。