如何在将数据从 Kafka 发送到 Influxdb 时设置 TAG



我正在使用Confluent Influxdb Connector将数据从Kafka发送到Influxdb。配置如下所示:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=myurl
topics=mytopic
tasks.max=1

架构如下所示:

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "sn",
      "type": "string"
    },
    {
      "name": "value",
      "type": "float"
    },
    {
      "name": "tagnum",
      "type": "string"
    }
  ]
}

将数据从 Kafka 发送到 Influxdb 时,每个数据项都被视为 FIELD。

当使用InfluxDB

连接器从Kafka发送到InfluxDB时,如何将某些数据项设置为TAG,例如将"tagnum"设置为TAG?

您的架构如下所示。重要的是您的标签位于map字段中。

{
    "type": "struct",
    "fields": [
        { "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false, "field": "tags" },
        { "field": "sn", "optional": false, "type": "string" },
        { "field": "value", "optional": false, "type": "float" }
    ],
    "optional": false,
    "version": 1
}

下面是发送 JSON 有效负载的示例:

kafkacat -b localhost:9092 -P -t testdata-json4 <<EOF
{ "schema": { "type": "struct", "fields": [ { "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false, "field": "tags" }, { "field": "sn", "optional": false, "type": "string" }, { "field": "value", "optional": false, "type": "float" } ], "optional": false, "version": 1 }, "payload": { "tags": { "tagnum": "5" }, "sn": "FOO", "value": 500.0 } }
EOF
curl -i -X PUT -H "Accept:application/json" 
        -H  "Content-Type:application/json" http://localhost:8083/connectors/SINK_INFLUX_01/config 
        -d '{
            "connector.class"               : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",
            "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                        : "testdata-json4",
            "influxdb.url"                  : "http://influxdb:8086",
            "influxdb.db"                   : "my_db",
            "measurement.name.format"       : "${topic}"
        }'

InfluxDB 的结果:

$ influx -execute 'SELECT * FROM "testdata-json4" GROUP BY tagnum;' -database "my_db"
name: testdata-json4
tags: tagnum=5
time                sn  value
----                --  -----
1579713749737000000 FOO 500

参考: https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/

相关内容

  • 没有找到相关文章

最新更新