如何从 IoT 中心为每个新生成者添加新的 Kafka 主题



我正在研究Azure云解决方案。我正在使用连接到 Kafka 的 IoT 中心来处理来自各种 IoT 设备的数据。我面临的是来自多个设备的所有数据都存储在同一个主题中。但是,我想将连接到 IoT 中心的每台设备的数据处理到 Kafka 中的特定主题(每个设备都有自己的 Kafka 主题(

Toketi "Kafka Connect Source Connector for Azure IoT Hub" 提供以下配置文件(边缘节点(

connector.class=com.microsoft.azure.iot.kafka.connect.source.IotHubSourceConnector
name=AzureIotHubConnector
tasks.max=1
Kafka.Topic=IotTopic
IotHub.EventHubCompatibleName=iothub-toketi
IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/
IotHub.AccessKeyName=service
IotHub.AccessKeyValue=4KsdfiB9J899a+N3iwerjKwzeqbZUj1K//KKj1ye9i3=
IotHub.ConsumerGroup=$Default
IotHub.Partitions=4
IotHub.StartTime=2016-11-28T00:00:00Z
IotHub.Offsets=
BatchSize=100
ReceiveTimeout=60

它适用于一个主题来存储来自多个设备的所有数据,但我希望在来自设备的数据之间进行隔离

任何解决方案或想法!

谢谢

解决方案之一是使用 SMT(单消息转换(。

源连接器流包含几个步骤:

  • 将来自外部源的数据轮询为List<SourceRecord>
  • 使用定义的 SMT 转换每条消息 ( SourceRecord ((如果未定义转换,则可以跳过
  • SourceRecord的键和值转换为字节数组。
  • 通过KafkaProducer向卡夫卡发送消息

Kafka Connect 根据SourceRecord::topic字段确定向哪个主题发送消息。使用 SMT,您可以设置正确的主题值。

Pure Apache Kafka Connect 没有这样的转换。如果您使用的是 Confluent 平台,则可以使用一些额外的转换。要提取主题名称,您可以使用提取主题。它有属性,称为field

有关SMT整个概念的更多信息,请访问Apache Kafka网页或Confluent网页

相关内容

  • 没有找到相关文章

最新更新