在 Spring Boot 中实现 KStream / Table 的困惑



我正在尝试获取一些 Spring-Boot kafka 流"动作"工作的样本,但我似乎最终完全困惑:)

我正在通过网络接收 JSON 数据。 我在 avro 中构建了一个用于序列化数据的架构:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}
{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

这是据我所知:

@Component
class KStreamTransformer {
    @Autowired
    private lateinit var objectMapper: ObjectMapper
    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {
        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)
            out
        }.groupBy() ???
    }
}

我希望创建一个看起来像这样的 KTable:

|platformUID|state|纬度|隆|替代||-----------|-----|---|---|---|

这就是我让自己感到困惑的地方。

我假设我想在PlatformUID领域做一个GroupBy,但我不清楚如何实际继续前进。

有人可以指出我正确的方向吗?

我想我正在寻找的是获取input流并将其转换为 KTable,键是value.getUID()的,值是它之前的值

如果platformUID已经是数据创建器使用的键,则可以使用

KTable ktable = kstream
                 .groupByKey()
                 .reduce((oldValue, newValue) -> newValue)

如果没有,应该在.groupBy()中放入一个键值映射器,它看起来像

KTable ktable = kstream
                 .groupBy((k, v) -> v.getPlatformUID())
                 .reduce((oldValue, newValue) -> newValue)

它将创建一个内部主题,该主题使用新键对源主题进行重新分区。

对于 Java 7,KeyValueMapper 的语法如下:

KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
        public K apply(K key, V1 value) {
            return key;
        }
    };

相关内容

  • 没有找到相关文章

最新更新