如何使用Flink中的AmazonS3对象



问题

我们有一个ApacheFlink应用程序,它被设计为从Kafka读取事件,并将计算结果发送到ElasticSearch中。由于一些资源问题,我们不得不从Kafka回退到AmazonS3。

这些消息以ndjson格式小批量发布到AmazonS3存储桶中。

文件的组织方式如下:/{year}/{month}/{day}/{hour}
因此,我们每小时都会创建一个新文件夹,用于存储最近的事件。

设计

正如我们所看到的,AmazonS3可以在创建新对象时发出通知
我们可以将这些通知推送到SQS或Lambda中。

  • 正如本主题中所述,Flink不支持SQS
  • 在Lambda的情况下,我们可以获得S3对象并将其推送到Kinesis数据流上

我们还找到了避免编写自定义Lambda函数的替代解决方案:

  • 通过AWS数据迁移服务
  • Via Athena

问题

但在所有情况下,我们最终都使用了KDS。有没有其他选择可以在创建对象时将数据从AmazonS3推送到Flink?

一种解决方案是使用readFile方法扫描s3存储桶中的新对象。当使用FileProcessingMode.PROCESS_CONTINUOUSLY和适当的轮询间隔进行配置时,这可以很好地工作。关键是用自定义的FilePathFilter定义一个TextInputFormat。文件路径过滤器将通过";目录";在s3 bucket中,由于您已经用日期部分构建了它们,递归可以在不扫描bucket中的大量对象的情况下找到新文件。

以下是自定义FilePathFilter的外观。我一直在使用类似的代码,每隔几分钟就会发现数百个新文件,而且它工作起来没有任何问题。

public class S3FilePathFilter extends FilePathFilter {
Pattern datePartsFromPath = Pattern.compile("\/(?<year>\d{4})\/?(?<month>\d{2})?\/?(?<day>\d{2})?\/?(?<hour>\d{2})?");
private final Duration ageLimit;
public S3FilePathFilter(Duration ageLimit) {
this.ageLimit=ageLimit;
}

@Override
public boolean filterPath(Path filePath) {
Matcher matcher = datePartsFromPath.matcher(filePath.toString());
if (matcher.find()) {
ZonedDateTime limit = ZonedDateTime.now(ZoneId.of("UTC")).minus(ageLimit);
int year = NumberUtils.toInt(matcher.group("year"));
int month = NumberUtils.toInt(matcher.group("month"), limit.getMonthValue());
int day = NumberUtils.toInt(matcher.group("day"), limit.getDayOfMonth());
int hour = NumberUtils.toInt(matcher.group("hour"), limit.getHour());
if (year != limit.getYear()) {
return year < limit.getYear();
}
if (month != limit.getMonthValue()) {
return month < limit.getMonthValue();
}
if (day != limit.getDayOfMonth()) {
return day < limit.getDayOfMonth();
}
return hour < limit.getHour();
}
return true;
}
}

最新更新