我有一个kafka流,该流可以从ktable中填写丢失的值(LeftJoin完美地做到了(。但是有时候,我必须为每个值加入一个数组,而我不知道该如何正确执行。
例如(我带一个家庭(:
{father: idFather, mother : idMother, children:[{child: id1},{child: id2}]
我能够与Ktable一起找到父亲和母亲的名字(加入ID(。但是对于孩子来说,我不知道如何循环每个孩子进入阵列(我不知道有多少个孩子(。
目前,我为每个孩子创建新的Kstreams: stream.flatMapValues(value -> value.get("children"))
,我为每个孩子加入。然后,我必须分组并减少或汇总我的数据,以用人员名称重建输入数据。
实际上,这是工作,但我不确定这是这样做的最佳方法,我更喜欢避免内部kafka存储以减少和聚合操作。
有人有更好的主意吗?感谢您的帮助
该方法是正确的。
如果您的KTable数据很小,则可以考虑使用GlobalKTable
进行加入。这允许使用KStream的非钥匙字段将查找到GlobalKTable
。