如何忽略 Kafka 流应用程序中的某些类型的消息,该应用程序从同一主题读取和写入不同的事件类型



假设Spring Cloud Stream应用程序从order topic创建KStream。它对OrderCreated {"id":x, "productId": y, "customerId": z}事件感兴趣。到达后,它会对其进行处理并生成OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z}到同一order topic的输出事件。

我面临的问题是,由于它从同一主题读取和写入,因此 Kafka Stream 应用程序正在尝试处理自己的写入,这没有意义。

如何防止此应用程序处理它生成的事件?

更新:正如Artem Bilan和sobychako指出的那样,我曾考虑过使用KStream.filter()但有一些细节让我怀疑如何处理这个问题:

现在,KStream 应用程序如下所示:

interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>

KStream 配置

@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

订单绑定和输出绑定都指向订单主题作为目标。

顺序创建的类:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}

订单已发货类

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}

我使用JSON作为消息格式,因此消息如下所示:

  • 输入 - 订单创建:{"id":1, "productId": 7,"customerId": 20}
  • 输出 - 订单发货:{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}

考虑到这一点,我正在寻找过滤掉不需要的消息的最佳方法

如果我现在只使用KStream.filter(),当我得到{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}时,我的KStream<Int, OrderCreated>会将 OrderShipped 事件解组为具有一些空字段的 OrderCreated 对象:OrderCreated(id:1, productId: 7, customerId: null).检查空字段听起来并不可靠。

一个可能的解决方案是向使用该主题的每种消息/类添加另一个字段eventType = OrderCreated|OrderShipped。即使在这种情况下,我最终也会有一个属性 eventType=OrderShip 的 OrderCreated 类(记住 KStream<Int,OrderCreated>)。这看起来像一个丑陋的解决方法。有什么想法可以改进它吗?

有没有另一种更自动的方法来解决这个问题?例如,如果消息不符合预期的架构(OrderCreated),另一种序列化(AVRO?)是否会阻止消息被处理? 根据本文,这种在同一主题中支持多个架构(事件类型)的方式似乎是一种很好的做法:https://www.confluent.io/blog/put-several-event-types-kafka-topic/但是,目前尚不清楚如何解组/反序列化不同的类型。

我已经接受了布鲁诺的回答作为解决这个问题的有效方法。但是,我认为我已经想出了一种更直接/合乎逻辑的方法,使用带有JsonTypeInfo注释的事件层次结构。

首先,您需要一个 Order 事件的基类并指定所有子类。请注意,将向 JSON 文档添加一个类型属性,该属性将帮助杰克逊封送/取消封送 DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}

有了这个,OrderCreatedEvent 对象的生产者将生成如下消息:

key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}

现在轮到KStream了。我已将签名更改为KStream<Int, OrderEvent>,因为它可以接收OrderCreatedEvent或OrderShippedEvent。在接下来的两行中...

orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

。我过滤以仅保留 OrderCreatedEvent 类的消息,并映射它们以将KStream<Int, OrderEvent>转换为KStream<Int, OrderCreatedEvent>

完整的 KStream 逻辑:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)

return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}

在此过程之后,我将生成一条key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"}到订单主题中的新消息,但这将被流过滤掉。

您可以使用 Kafka 的记录标头来存储记录的类型。见KIP-82。您可以在ProducerRecord中设置标头。

处理过程如下:

  1. 阅读主题中带有值 serdeSerdes.BytesSerdeKStream<Integer, Bytes>类型的stream
  2. 使用KStream#transformValues()筛选和创建对象。更具体地说,在transformValues()中,您可以访问ProcessorContext,该使您可以访问包含有关记录类型的信息的记录标头。然后:

    • 如果类型为OrderShipped,则返回null
    • 否则,从Bytes对象创建一个OrderCreated对象并返回它。

对于使用 AVRO 的解决方案,您可能需要查看以下文档

  • https://docs.confluent.io/current/streams/developer-guide/datatypes.html
  • https://docs.confluent.io/current/schema-registry/serializer-formatter.html

相关内容

  • 没有找到相关文章

最新更新