有没有办法添加全局存储供变压器使用?在变压器文档中,它说:
"将输入流的每个记录转换为输出流中的零或更多记录(可以任意更改键和值类型)。将变压器(由给定的变压器提供)应用于每个输入记录,并计算零或更多输出记录。为了分配状态,必须通过 AddStateStore 或 addglobalstore 通过添加的商店事先创建和注册状态,然后才能连接到变压器&quort&quort&quort&quort&quort"
然而, addglobalstore 的API on processSupplier?
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
stateUpdateSupplier: ProcessorSupplier[_, _])
我的最终目标是使用Kafka流DSL,带有变压器,因为我需要一个flatmap并将键和值转换为我的输出主题。我的拓扑中没有处理器。
我希望这样的事情:
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])
传递到 addGlobalStore()
的 Processor
用于维护(即编写)商店。请注意,这是可以预期的是,此Processor
将数据AS-IS复制到商店中(cf https://issues.apache.org/jira/browse/browse/kafka-7663)。
添加了全球商店后,还可以添加一个Transformer
,并且Transformer
可以访问该商店。请注意,不需要连接全局商店以使其可用(只需要添加"常规"商店)。另请注意,Transformer
仅可以读取对全局商店的访问。
使用处理器而不是变压器,对于您要在输入主题上执行的所有转换,只要有来自GlobalStateStore的用户酶查找数据。使用context.forward(key,value,childName)
将数据发送到下游节点。context.forward(key,value,childName)
可以在process()
和punctuate()
中多次调用,以便将多个记录发送到下游节点。如果需要更新GlobalStateTore,则仅在传递给addGlobalStore(..)
的 processor 中执行此操作,因为,有一个与GlobalStateStore相关的GlobalStreamThread,可以使商店的状态保持在所有运行的KStream实例中的一致性。<<<<<<<<<<。/p>