使用 kafka-connect 将数据摄取到 s3 时,如何根据 json 字段的一部分进行分区



我正在尝试使用 s3-sink kafka 连接器将一些 json 数据存储到 s3 中。我的 json 格式如下:

{
   "server": someserver,
   "id": someid,
   "time": "2018-01-18T23:47:03.737487Z"
}

我想根据数据所在的小时对数据进行分区,但忽略分钟和秒。 例如。 以上 JSON 将属于 2018-01-18T23 目录。我应该如何在属性文件中设置 field.partition 来实现这一点?

多谢!

使用 Confluent 的 S3 连接器实现您所描述的粗略方法是:

  • 定义要RecordField的属性timestamp.extractor,以从记录上的字段中提取时间戳。
  • 将属性timestamp.field设置为该记录字段的名称(示例中time(
  • 设置path.format属性。这将允许您将文件存储到每小时,如您在示例中提到的,忽略更精细的粒度(分钟、秒等(。
  • 此外,partition.duration.ms设置为对您有意义的粒度。重要的是,默认值 -1 将不允许您使用基于时间的分区。
  • 最后,如果您使用的是预定义的分区程序
  • 之一或相关的基于时间的自定义分区程序,请设置属性localetimezone

请注意,连接器附带了一个预定义的基于时间的分区程序类,您可能会发现它对您的用例很有用。您可以通过设置来使用它:

partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner

相关内容

  • 没有找到相关文章

最新更新