在Kafka join中丢失一些事件



我的Spring-Cloud应用程序中有3个不同的流。每天有数十万条记录,但是每天大约有3到4条记录消失。我看到它们出现在日志中,但并没有完成所有的连接。

代码:

@StreamListener
fun processOrderEvent(
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_CHARGE_IN)
chargeEvent: KStream<String, ChargeEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_IN)
orderEvent: KStream<String, OrderChargedEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CREATED_CHARGE_IN)
orderCreatedEvent: KStream<String, OrderCreatedEvent>
) {
val tracing = Tracing.newBuilder().build()
val kafkaStreamsTracing = KafkaStreamsTracing.create(tracing)
val chargeKeyValue = chargeEvent
.filter { _, event -> shouldProcessBusinessConditions(event) && (event.flow == "___________" || event.flow == "____")}
.transform(
kafkaStreamsTracing.map<String, ChargeEvent, ByteArray, ByteArray>("processOrderEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val keyValue = KeyValue(value.id.toString().toByteArray(), objectMapper.writeValueAsString(
Charge(
value.id.toString(),
value.amount?.value,
value.status?.name,
LocalDateTime.now().toString(),
value.paymentMethod?.type?.name,
value.creditor?.customerId,
value.channel?.name,
value.amount?.currency?.name,
value.paymentMethod?.installments,
value.card?.brand,
buildSellerEmail(value),
value.createdAt,
value.amount?.summary?.total,
value.amount?.summary?.paid,
value.amount?.summary?.refunded,
value.connect?.id,
value.connect?.name,
value.flow
)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.id} step=chargeKeyValue")
keyValue
}
)
val orderKeyValue = orderEvent
.transform(
kafkaStreamsTracing.map<String, OrderChargedEvent, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.chargeId} orderId=${value.orderId} step=orderKeyValue")
KeyValue(value.chargeId.toByteArray(), objectMapper.writeValueAsString(Order(value.orderId, value.chargeId)).toByteArray())
}
)
val orderCreatedKeyValue = orderCreatedEvent
.transform(
kafkaStreamsTracing.map<String, OrderCreatedEvent, ByteArray, ByteArray>("processOrderEvent_OrderCreatedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} step=before_orderCreatedKeyValue")
val originalValue = buildOrderOriginalValue(value)
val keyValue = KeyValue(value.orderId.toByteArray(), objectMapper.writeValueAsString(OrderCreated(value.orderId, originalValue)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} originalValue=${originalValue} step=orderCreatedKeyValue")
keyValue
}
)
chargeKeyValue.join(orderKeyValue, OrderChargeValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val orderWithChargeJson = objectMapper.readValue(value, OrderWithCharge::class.java)
val keyValue = KeyValue(orderWithChargeJson.order.orderId!!.toByteArray(), value)
log.info("m=processOrderEvent traceId=$traceId chargeId=${orderWithChargeJson.charge.chargeId} orderId=${orderWithChargeJson.order.orderId} step=orderKeyValueJoin")
keyValue
}
)
.join(orderCreatedKeyValue, OrderCreatedValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent_OrderCreatedEvent") { key, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val event = objectMapper.readValue(value, OrderWithChargeAndOrderCreated::class.java)
log.info("m=processOrderEvent traceId=$traceId chargeId=${event.charge.chargeId} orderId=${event.order.orderId} step=orderCreatedKeyValueJoin")
KeyValue(key, objectMapper.writeValueAsString(OrderWithChargeAndOrderCreatedTraceId(traceId, event)).toByteArray())
}
).process(ProcessorSupplier { businessConditionCkoutEventProcessor })
}

少数不工作的日志之一:

9/1/215:48:15.863 AM
e2e64c0faa9e 05:48:15.863[订单-charge -charge -v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId= order__413d - 4d5c - _1873f0db5ade step=orderKeyValueJoin

9/1/215:48:15.763 AM
c232aa303f2f 05:48:15.763[订单-charge -charge -v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId= order__413d - 4d5c - _1873f0db5ade step=orderKeyValue

9/1/2105:48:15.749 AM
c232aa303f2f 05:48:15.749[订单-收费-收费-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=7f8c7bb5bbdb422b chargeId=-c798-4891--58ecdbf7ccf7 step=chargeKeyValue

8/31/216:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders- charge- charge- v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId= order__413d - 4d5c - _1873f0db5ade originalValue=22000 step=orderCreatedKeyValue

8/31/216:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders- charge- charge- v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId= orde_3_413d - 4d5c - _1873f0db5ade step=before_orderCreatedKeyValue

工作的数千个日志之一的日志:

9/2/216:34:33.547 PM
f84980d99867 18:34:33.547 [orders- charge- charge -charge -v3-63c5bb8a-8026-4b05-937f- c41360f90202 - streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a--8576- 3cb1173804dd5 orderId= order__084f - 4cce - _ba4dcaf1d50a step=orderCreatedKeyValueJoin

9/2/216:34:33.446 PM
e2e64c0faa9e 18:34:33.446 [orders-charge -charge -v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=3834ad80-243a-4ffe-8576- 3cb1173804dd5 orderId= order__084f - 4cce - _ba4dcaf1d50a step=orderKeyValueJoin

9/2/216:34:33.346 PM
f84980d99867 18:34:33.346 [orders- charge- charge- v3-63c5bb8a-8026-4b05-937f- c41360f90202 - streamthread -1] INFO up .p.p. . someeventstreams - m=processOrderEvent traceId=6501784c0bf59948 orderId= order__084f - 4cce - _ba4dcaf1d50a originalValue=85400 step=orderCreatedKeyValue

9/2/216:34:33.346 PM
f84980d99867 18:34:33.346 [orders- charge- charge- v3-63c5bb8a-8026-4b05-937f- c41360f90202 - streamthread -1] INFO up .p.p. . someeventstreams - m=processOrderEvent traceId=6501784c0bf59948 orderId= order__084f - 4cce - _ba4dcaf1d50a step=before_orderCreatedKeyValue

9/2/216:34:33.344 PM
c232aa303f2f 18:34:33.344[订单-charge -charge -v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a-4ffe--3cb1173804d5 orderId=- 084f - 4cce -- ba4dcaf1d50a step=orderKeyValue

9/2/216:34:33.326 PM
c232aa303f2f 18:34:33.326[订单-收费-收费-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707- streamthread -1] INFO up .p.p.s someeventstreams - m=processOrderEvent traceId=efd790c85a8f053c chargeId=-4ffe--3cb1173804d5 step=chargeKeyValue

"OrderCreatedKeyValuejoin"日志是结束,在大多数情况下显示,但是,在某些情况下,事件永远不会在结束时出现。

您需要检查如何使用Kafka实现DLQ。

要实现DLQ,您需要创建单独的队列,无论主题失败,您都需要将其放在DLQ中。写重试逻辑来读。

也许这份文件对你有帮助https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC1/multi/multi_kafka-dlq-processing.html

使用spring-kafka的Kafka死信队列(DLQ)

相关内容

  • 没有找到相关文章

最新更新