如何在向apacheflink中的自定义接收器写入之前缓冲记录



我是Flink的新手,需要一些技术用例方面的帮助。

当前场景:

我有一个flink应用程序,它在GKE上运行,并使用自定义接收器将记录(来自Kafka源(写入BigQuery。我可以将记录写入BigQuery而不会出现任何问题。目前,记录被一个接一个地写入接收器,因此每个kafka消息都有自己的对BigQuery的insert api调用,这并不理想,因为我们需要执行批量插入,并且单独插入每个记录将非常昂贵。我使用的是bigquery storagewrite api。

新要求:

在写入BigQuery之前缓冲记录理想情况下,在将记录写入接收器之前,我希望根据大小/时间对其进行缓冲。

我不知道如何在Flink中实现这一点,因此我正在寻找实现功能的方法

您可能可以在数据流上使用流程窗口函数。

例如,如果你使用10秒的翻滚窗口,它将收集该时间段内的所有记录,然后你可以下沉

input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/windows/#processwindowfunction

最新更新