我正在使用KStream<key, coordinates>
工作,我需要确定每个坐标在哪个周长中并输出KStream<key, perimeterId>
。
所有周围都在GlobalKTable<perimeterID, perimeterVertices>
内。我有一个点对点函数来执行此操作,我只是不知道如何在Kafka-streams中使用它,而没有明显的表值函数或Cross应用于Kafka-streams。
我最终做出了一种难以扩展的丑陋方式。
我的Globalktable现在是单行"1" List<perimeterID,perimeterVertices>
。这使我可以使用" 1"作为每个事件的密钥离开加入。在Value Joininer中,我运行我的点式函数并设置周长ID,然后将<key, perimeterId>
返回到我的输出流。