我有一个持续接收事件的AWS kineesis数据流,这些事件被发送到kineesis数据分析,以使用apache flink通过滚动窗口获得指标。
是否有可能转储x%的随机数据在滚动窗口s3桶?如果是,请分享代码片段。
您的滚动窗口输出,如果没有聚合,我假设将是某种Seq[Element]
。我能想到的最简单的方法是使用flatMap
操作符对想要存储在S3中的元素进行采样,并将输出连接到写入S3的FileSink
。
根据输出格式的不同,代码看起来会有很大的不同。最简单的例子是:
val windowOutStream: Seq[Element] = ...
val sampledOutStream: Seq[String] = windowOutStream.flatMap(window => {
// iterate over all the elements in the window
// filter the ones you want to store to S3
// encode the element as a string and add the window start/end times so you can later identify to which window they belonged to
})
// writeAsText will write each element in a new line in the same file
sampledOutStream.writeAsText("s3://<bucket>/<endpoint>");
您可能希望使用比字符串更优化的输出格式,并添加滚动策略,以便自动创建新文件。
文档展示了如何初始化Row或Bulk编码的文件接收器,它们使用其他输出编码并且更加灵活,但其思想将是相似的。