添加一个全球商店供变压器消耗



有没有办法添加全局存储供变压器使用?在变压器文档中,它说:

"将输入流的每个记录转换为输出流中的零或更多记录(可以任意更改键和值类型)。将变压器(由给定的变压器提供)应用于每个输入记录,并计算零或更多输出记录。为了分配状态,必须通过 AddStateStore addglobalstore 通过添加的商店事先创建和注册状态,然后才能连接到变压器&quort&quort&quort&quort&quort"

然而, addglobalstore 的API on processSupplier?

addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
                     topic: String,
                     consumed: Consumed[_, _],
                     stateUpdateSupplier: ProcessorSupplier[_, _])

我的最终目标是使用Kafka流DSL,带有变压器,因为我需要一个flatmap并将键和值转换为我的输出主题。我的拓扑中没有处理器。

我希望这样的事情:

addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])

传递到 addGlobalStore()Processor用于维护(即编写)商店。请注意,这是可以预期的是,此Processor将数据AS-IS复制到商店中(cf https://issues.apache.org/jira/browse/browse/kafka-7663)。

添加了全球商店后,还可以添加一个Transformer,并且Transformer可以访问该商店。请注意,不需要连接全局商店以使其可用(只需要添加"常规"商店)。另请注意,Transformer仅可以读取对全局商店的访问。

使用处理器而不是变压器,对于您要在输入主题上执行的所有转换,只要有来自GlobalStateStore的用户酶查找数据。使用context.forward(key,value,childName)将数据发送到下游节点。context.forward(key,value,childName)可以在process()punctuate()中多次调用,以便将多个记录发送到下游节点。如果需要更新GlobalStateTore,则仅在传递给addGlobalStore(..) processor 中执行此操作,因为,有一个与GlobalStateStore相关的GlobalStreamThread,可以使商店的状态保持在所有运行的KStream实例中的一致性。<<<<<<<<<<。/p>

最新更新