我正在尝试使用 s3-sink kafka 连接器将一些 json 数据存储到 s3 中。我的 json 格式如下:
{
"server": someserver,
"id": someid,
"time": "2018-01-18T23:47:03.737487Z"
}
我想根据数据所在的小时对数据进行分区,但忽略分钟和秒。 例如。 以上 JSON 将属于 2018-01-18T23 目录。我应该如何在属性文件中设置 field.partition 来实现这一点?
多谢!
使用 Confluent 的 S3 连接器实现您所描述的粗略方法是:
- 定义要
RecordField
的属性timestamp.extractor
,以从记录上的字段中提取时间戳。 - 将属性
timestamp.field
设置为该记录字段的名称(示例中time
( - 设置
path.format
属性。这将允许您将文件存储到每小时,如您在示例中提到的,忽略更精细的粒度(分钟、秒等(。 - 此外,
partition.duration.ms
设置为对您有意义的粒度。重要的是,默认值-1
将不允许您使用基于时间的分区。
最后,如果您使用的是预定义的分区程序 - 之一或相关的基于时间的自定义分区程序,请设置属性
locale
和timezone
。
请注意,连接器附带了一个预定义的基于时间的分区程序类,您可能会发现它对您的用例很有用。您可以通过设置来使用它:
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner