我正在从kafka主题中获取数据,然后使用flatMap爆炸数组并产生多个事件。
传入事件格式:
Event(eventId: Long, time: Long)
IncomingEvent(customerId: Long, events: List[Event])
传入事件爆炸后的事件格式:
EventAfterExploding(customerId: Long, eventId: Long, time: Long)
这些事件将通过Flink提供的JDBC sink写入MySQL。
存储在相同Kafka分区中的数据具有相同的客户id,这意味着我在这里没有任何排序问题。但是在一个事件中可以有很多的eventIds
,所以这意味着在flatMap
操作之后在同一个flink分区中可以有很多事件。这将导致延迟或OOM问题,因为一个操作员必须处理更多的数据。为了防止这个问题,我可以应用重分区或增加并行性。但是这里还有一个问题,每个(customerId, eventId)
对都必须被发送到相同的接收器操作符,因为如果不同的编写器试图操作同一对,可能会出现竞争条件问题。例如,
event1 => EventAfterExploding(1, 1, 1)
event2 => EventAfterExploding(1, 1, 2)
在这种情况下,数据库必须包含具有最新时间的event2
,但如果这两个数据进入不同的sink分区,则event1
可以在数据库中而不是event2
。
当同一分区中有大量数据时,我如何解决竞争条件问题和伸缩问题?应用下面给出的代码块能解决这些问题吗?我认为下面给出的代码,因为在keyBy
操作之后,数据将被重新分发,它也将保证相同的数据将被发送到相同的分区,但只是想确保。谢谢!
incomingEvents
.flatMap(new ExplodingFunction())
.keyBy(event => (event.customerId, event.eventId))
.addSink(JdbcSink.sink(...))
是的,该代码将具有您正在寻找的效果。相同的customerId和eventId的所有事件将转到接收器的同一实例。