Message Hub & Confluent Kafka Connect S3



我需要将来自 IBM MHub 主题的消息使用 IBM 对象存储。

我让它与本地 Kafka 服务器配合使用 Confluent Kafka Connect S3 插件作为接收器 Amazon S3 存储桶和文件的独立工作线程。两者都是成功的。

如果我将 Confluent Kafka Connect S3 配置为 IBM MHub 集群的分布式工作线程,则不会收到任何错误,但仍然没有消息最终发送到 Amazon S3 存储桶。我也尝试了文件接收器,也没有运气。

可能吗?

可以尝试使用消息中心(现在称为事件流( 云对象存储桥 : https://console.bluemix.net/docs/services/MessageHub/messagehub115.html#cloud_object_storage_bridge

似乎符合您的要求?

来自: https://kafka.apache.org/documentation/#connect_running

此处配置的参数适用于 Kafka Connect 用于访问配置、偏移和状态主题的生产者和使用者。对于 Kafka 源和 Kafka 接收器任务的配置,可以使用相同的参数,但需要以 consumer 为前缀。和制片人。分别。从工作线程配置继承的唯一参数是 bootstrap.servers,在大多数情况下就足够了,因为同一集群通常用于所有目的。一个值得注意的例外是安全群集,它需要额外的参数来允许连接。这些参数最多需要在辅助角色配置中设置三次,一次用于管理访问,一次用于 Kafka 接收器,一次用于 Kafka 源。

因此,解决方案是将带有 consumer. 前缀的重复配置添加到辅助角色配置中,以便进行所需的sasl_ssl设置,而不是接收器使用者上的默认值。

IBM Cloud Object Storage 也同样有效。需要凭据,例如。env vars: AWS_ACCESS_KEY_ID="see cos credentials" & AWS_SECRET_ACCESS_KEY="see cos credentials">

连接器配置:

{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "5",
"topics": "your-topic",
"s3.region": "eu-central-1",
"store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
"s3.bucket.name": "your-bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"name": "s3-sink"
}

}

最新更新