正确配置 Kafka Connect S3 接收器基于时间的分区程序



我正在尝试使用Confluent S3接收器的TimeBasedPartitioner。这是我的配置:

{  
"name":"s3-sink",
"config":{  
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"1",
    "file":"test.sink.txt",
    "topics":"xxxxx",
    "s3.region":"yyyyyy",
    "s3.bucket.name":"zzzzzzz",
    "s3.part.size":"5242880",
    "flush.size":"1000",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "timestamp.extractor":"Record",
    "timestamp.field":"local_timestamp",
    "path.format":"YYYY-MM-dd-HH",
    "partition.duration.ms":"3600000",
    "schema.compatibility":"NONE"
}

}

数据是二进制的,我使用它 avro 方案。我想使用实际的记录字段"local_timestamp",这是一个 UNIX 时间戳来划分数据,比如说每小时一次。

我使用通常的 REST API 调用启动连接器

curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors

不幸的是,数据没有按照我的意愿进行分区。我还尝试删除刷新大小,因为这可能会干扰。但后来我得到了错误

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):nMissing required configuration "flush.size" which has no default value.nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%

知道如何正确设置基于时间的Partioner吗?我找不到工作示例。

此外,如何调试此类问题或进一步了解连接器的实际操作?

非常感谢任何帮助或进一步的建议。

在研究了TimeBasedPartitioner的代码之后.java以及日志

confluent log connect tail -f

我意识到时区和区域设置都是强制性的,尽管在 Confluent S3 连接器文档中没有指定。以下配置字段解决了这个问题,让我上传正确分区到 S3 存储桶的记录:

"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",

请注意另外两件事:首先,flush.size 的值也是必需的,文件最终被分区为较小的块,不超过 flush.size 指定的大块。其次,最好选择如上所示的 path.format,以便生成适当的树结构。

我仍然不能 100% 确定是否真的使用记录字段local_timestamp对记录进行分区。

非常欢迎任何意见或改进。

事实上,

您修改后的配置似乎是正确的。

具体而言,将timestamp.extractor设置为 RecordField 允许您根据记录具有的时间戳字段对文件进行分区,并且您可以通过设置属性 timestamp.field 来标识该字段。

相反,当设置timestamp.extractor=Record时,基于时间的分区程序将为每个记录使用 Kafka 时间戳。

关于flush.size,将此属性设置为高值(例如 Integer.MAX_VALUE (实际上是忽略它的同义词。

最后,最新版本的连接器不再需要schema.generator.class

相关内容

  • 没有找到相关文章

最新更新