KTable降低功能不会尊重窗口



要求: - 我们需要合并所有具有相同订购的消息,并对合并消息执行后续操作。

说明: - 代码段落下方试图捕获从特定租户收到的所有订单消息,并在等待特定时间段后试图合并到单个订单消息它做以下内容

  1. 基于OrderID的重新分配消息。因此,每个订单消息都将带有tenantid和groupID作为其密钥
  2. 执行Groupby密钥操作,然后进行窗户操作2分钟
  3. 窗口完成后,进行减少操作。
  4. ktable再次转换为流式流,然后将其输出发送到另一个kafka主题

预期输出: - 如果有5个消息在窗口周期中发送相同的订单ID。预计最终的Kafka主题应该只有一条消息,这将是最后一个减少操作消息。

实际输出: - 所有5条消息都表明在调用减少操作之前没有发生窗口。在收到每条消息时,Kafka中看到的所有消息都有适当的减少操作。

查询: - 在kafka流库版本0.11.0.0中,降低功能,用于接受TimeWindow作为其参数。我看到这是在Kafka流版本1.0.0中弃用的。在以下代码中完成的窗口,是否正确?Kafka流库1.0.0的较新版本中是否支持窗口?如果是这样,那么在代码的下图下可以改进一些东西吗?

        String orderMsgTopic = "sampleordertopic";
        JsonSerializer<OrderMsg> orderMsgJSONSerialiser = new JsonSerializer<>();
        JsonDeserializer<OrderMsg> orderMsgJSONDeSerialiser = new JsonDeserializer<>(OrderMsg.class);
        Serde<OrderMsg> orderMsgSerde = Serdes.serdeFrom(orderMsgJSONSerialiser,orderMsgJSONDeSerialiser);

        KStream<String, OrderMsg> orderMsgStream = this.builder.stream(orderMsgTopic, Consumed.with(Serdes.ByteArray(), orderMsgSerde))
                                                                .map(new KeyValueMapper<byte[], OrderMsg, KeyValue<? extends String, ? extends OrderMsg>>() {
                                                                    @Override
                                                                    public KeyValue<? extends String, ? extends OrderMsg> apply(byte[] byteArr, OrderMsg value) {
                                                                        TenantIdMessageTypeDeserializer deserializer = new TenantIdMessageTypeDeserializer();
                                                                        TenantIdMessageType tenantIdMessageType = deserializer.deserialize(orderMsgTopic, byteArr);
                                                                        String newTenantOrderKey = null;
                                                                        if ((tenantIdMessageType != null) && (tenantIdMessageType.getMessageType() == 1)) {
                                                                            Long tenantId = tenantIdMessageType.getTenantId();
                                                                            newTenantOrderKey = tenantId.toString() + value.getOrderKey();
                                                                        } else {
                                                                            newTenantOrderKey = value.getOrderKey();
                                                                        }
                                                                        return new KeyValue<String, OrderMsg>(newTenantOrderKey, value);
                                                                    }
                                                                });

        final KTable<Windowed<String>, OrderMsg> orderGrouping = orderMsgStream.groupByKey(Serialized.with(Serdes.String(), orderMsgSerde))
                                                                                .windowedBy(TimeWindows.of(windowTime).advanceBy(windowTime))
                                                                                .reduce(new OrderMsgReducer());

        orderGrouping.toStream().map(new KeyValueMapper<Windowed<String>, OrderMsg, KeyValue<String, OrderMsg>>() {
                                                                    @Override
                                                                    public KeyValue<String, OrderMsg> apply(Windowed<String> key, OrderMsg value) {
                                                                        return new KeyValue<String, OrderMsg>(key.key(), value);
                                                                    }
                                                                }).to("newone11", Produced.with(Serdes.String(), orderMsgSerde));

我意识到我已经设置了streamsconfig.cache_max_bytes_buffering_config为0,并将默认提交间隔设置为1000ms。更改此值有助于我在某种程度上使窗口工作

相关内容

  • 没有找到相关文章

最新更新